Skip to content

Commit d067d2d

Browse files
authored
Merge pull request #65 from vulcanize/v1.10.2-statediff-0.0.19
improve error logging; handle PushBlock internal err
2 parents 2a11d8f + 77557ea commit d067d2d

File tree

4 files changed

+60
-54
lines changed

4 files changed

+60
-54
lines changed

statediff/doc.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ This service introduces a CLI flag namespace `statediff`
7979

8080
`--statediff` flag is used to turn on the service
8181
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
82+
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
8283
`--statediff.db` is the connection string for the Postgres database to write to
8384
`--statediff.dbnodeid` is the node id to use in the Postgres database
8485
`--statediff.dbclientname` is the client name to use in the Postgres database

statediff/indexer/indexer.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
111111
// Generate the block iplds
112112
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(block, receipts)
113113
if err != nil {
114-
return nil, err
114+
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
115115
}
116116
if len(txNodes) != len(txTrieNodes) && len(rctNodes) != len(rctTrieNodes) && len(txNodes) != len(rctNodes) {
117117
return nil, fmt.Errorf("expected number of transactions (%d), transaction trie nodes (%d), receipts (%d), and receipt trie nodes (%d)to be equal", len(txNodes), len(txTrieNodes), len(rctNodes), len(rctTrieNodes))
@@ -124,7 +124,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
124124
if err != nil {
125125
return nil, err
126126
}
127-
blocktx := BlockTx{
127+
defer func() {
128+
if p := recover(); p != nil {
129+
shared.Rollback(tx)
130+
panic(p)
131+
} else if err != nil {
132+
shared.Rollback(tx)
133+
}
134+
}()
135+
blockTx := &BlockTx{
128136
dbtx: tx,
129137
// handle transaction commit or rollback for any return case
130138
Close: func(err error) error {
@@ -164,15 +172,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
164172
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
165173
t = time.Now()
166174
// Publish and index uncles
167-
if err := sdi.processUncles(tx, headerID, height, uncleNodes); err != nil {
175+
err = sdi.processUncles(tx, headerID, height, uncleNodes)
176+
if err != nil {
168177
return nil, err
169178
}
170179
tDiff = time.Since(t)
171180
indexerMetrics.tUncleProcessing.Update(tDiff)
172181
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
173182
t = time.Now()
174183
// Publish and index receipts and txs
175-
if err := sdi.processReceiptsAndTxs(tx, processArgs{
184+
err = sdi.processReceiptsAndTxs(tx, processArgs{
176185
headerID: headerID,
177186
blockNumber: block.Number(),
178187
receipts: receipts,
@@ -181,25 +190,26 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
181190
rctTrieNodes: rctTrieNodes,
182191
txNodes: txNodes,
183192
txTrieNodes: txTrieNodes,
184-
}); err != nil {
193+
})
194+
if err != nil {
185195
return nil, err
186196
}
187197
tDiff = time.Since(t)
188198
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
189199
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
190200
t = time.Now()
191201

192-
blocktx.BlockNumber = height
193-
blocktx.headerID = headerID
194-
return &blocktx, err
202+
blockTx.BlockNumber = height
203+
blockTx.headerID = headerID
204+
return blockTx, err
195205
}
196206

197207
// processHeader publishes and indexes a header IPLD in Postgres
198208
// it returns the headerID
199209
func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
200210
// publish header
201211
if err := shared.PublishIPLD(tx, headerNode); err != nil {
202-
return 0, err
212+
return 0, fmt.Errorf("error publishing header IPLD: %v", err)
203213
}
204214
// index header
205215
return sdi.dbWriter.upsertHeaderCID(tx, models.HeaderModel{
@@ -223,7 +233,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNum
223233
// publish and index uncles
224234
for _, uncleNode := range uncleNodes {
225235
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
226-
return err
236+
return fmt.Errorf("error publishing uncle IPLD: %v", err)
227237
}
228238
uncleReward := CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
229239
uncle := models.UncleModel{
@@ -261,24 +271,24 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
261271
trx := args.txs[i]
262272
from, err := types.Sender(signer, trx)
263273
if err != nil {
264-
return err
274+
return fmt.Errorf("error deriving tx sender: %v", err)
265275
}
266276

267277
// Publishing
268278
// publish trie nodes, these aren't indexed directly
269279
if err := shared.PublishIPLD(tx, args.txTrieNodes[i]); err != nil {
270-
return err
280+
return fmt.Errorf("error publishing tx trie node IPLD: %v", err)
271281
}
272282
if err := shared.PublishIPLD(tx, args.rctTrieNodes[i]); err != nil {
273-
return err
283+
return fmt.Errorf("error publishing rct trie node IPLD: %v", err)
274284
}
275285
// publish the txs and receipts
276286
txNode, rctNode := args.txNodes[i], args.rctNodes[i]
277287
if err := shared.PublishIPLD(tx, txNode); err != nil {
278-
return err
288+
return fmt.Errorf("error publishing tx IPLD: %v", err)
279289
}
280290
if err := shared.PublishIPLD(tx, rctNode); err != nil {
281-
return err
291+
return fmt.Errorf("error publishing rct IPLD: %v", err)
282292
}
283293

284294
// Indexing
@@ -344,7 +354,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
344354
// publish the state node
345355
stateCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
346356
if err != nil {
347-
return err
357+
return fmt.Errorf("error publishing state node IPLD: %v", err)
348358
}
349359
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
350360
stateModel := models.StateNodeModel{
@@ -386,7 +396,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
386396
for _, storageNode := range stateNode.StorageNodes {
387397
storageCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
388398
if err != nil {
389-
return err
399+
return fmt.Errorf("error publishing storage node IPLD: %v", err)
390400
}
391401
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
392402
storageModel := models.StorageNodeModel{
@@ -409,10 +419,10 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd
409419
// codec doesn't matter since db key is multihash-based
410420
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
411421
if err != nil {
412-
return err
422+
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
413423
}
414424
if err := shared.PublishDirect(tx.dbtx, mhKey, codeAndCodeHash.Code); err != nil {
415-
return err
425+
return fmt.Errorf("error publishing code IPLD: %v", err)
416426
}
417427
return nil
418428
}

statediff/indexer/writer.go

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
package indexer
1818

1919
import (
20+
"fmt"
21+
2022
"github.com/jmoiron/sqlx"
2123

2224
"github.com/ethereum/go-ethereum/common"
2325
"github.com/ethereum/go-ethereum/statediff/indexer/models"
2426
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
25-
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
2627
)
2728

2829
var (
@@ -49,36 +50,19 @@ func (in *PostgresCIDWriter) upsertHeaderCID(tx *sqlx.Tx, header models.HeaderMo
4950
RETURNING id`,
5051
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot,
5152
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID)
52-
if err == nil {
53-
indexerMetrics.blocks.Inc(1)
53+
if err != nil {
54+
return 0, fmt.Errorf("error upserting header_cids entry: %v", err)
5455
}
55-
return headerID, err
56+
indexerMetrics.blocks.Inc(1)
57+
return headerID, nil
5658
}
5759

5860
func (in *PostgresCIDWriter) upsertUncleCID(tx *sqlx.Tx, uncle models.UncleModel, headerID int64) error {
5961
_, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
6062
ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)`,
6163
uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
62-
return err
63-
}
64-
65-
func (in *PostgresCIDWriter) upsertTransactionAndReceiptCIDs(tx *sqlx.Tx, payload shared.CIDPayload, headerID int64) error {
66-
for _, trxCidMeta := range payload.TransactionCIDs {
67-
var txID int64
68-
err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
69-
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data) = ($3, $4, $5, $6, $7, $8)
70-
RETURNING id`,
71-
headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey, trxCidMeta.Data).Scan(&txID)
72-
if err != nil {
73-
return err
74-
}
75-
indexerMetrics.transactions.Inc(1)
76-
receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)]
77-
if ok {
78-
if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil {
79-
return err
80-
}
81-
}
64+
if err != nil {
65+
return fmt.Errorf("error upserting uncle_cids entry: %v", err)
8266
}
8367
return nil
8468
}
@@ -89,20 +73,22 @@ func (in *PostgresCIDWriter) upsertTransactionCID(tx *sqlx.Tx, transaction model
8973
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data) = ($3, $4, $5, $6, $7, $8)
9074
RETURNING id`,
9175
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID)
92-
if err == nil {
93-
indexerMetrics.transactions.Inc(1)
76+
if err != nil {
77+
return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err)
9478
}
95-
return txID, err
79+
indexerMetrics.transactions.Inc(1)
80+
return txID, nil
9681
}
9782

9883
func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct models.ReceiptModel, txID int64) error {
9984
_, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key, post_state, post_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
10085
ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key, post_state, post_status) = ($2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
10186
txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey, rct.PostState, rct.PostStatus)
102-
if err == nil {
103-
indexerMetrics.receipts.Inc(1)
87+
if err != nil {
88+
return fmt.Errorf("error upserting receipt_cids entry: %v", err)
10489
}
105-
return err
90+
indexerMetrics.receipts.Inc(1)
91+
return nil
10692
}
10793

10894
func (in *PostgresCIDWriter) upsertStateCID(tx *sqlx.Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) {
@@ -115,14 +101,20 @@ func (in *PostgresCIDWriter) upsertStateCID(tx *sqlx.Tx, stateNode models.StateN
115101
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
116102
RETURNING id`,
117103
headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID)
118-
return stateID, err
104+
if err != nil {
105+
return 0, fmt.Errorf("error upserting state_cids entry: %v", err)
106+
}
107+
return stateID, nil
119108
}
120109

121110
func (in *PostgresCIDWriter) upsertStateAccount(tx *sqlx.Tx, stateAccount models.StateAccountModel, stateID int64) error {
122111
_, err := tx.Exec(`INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5)
123112
ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)`,
124113
stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
125-
return err
114+
if err != nil {
115+
return fmt.Errorf("error upserting state_accounts entry: %v", err)
116+
}
117+
return nil
126118
}
127119

128120
func (in *PostgresCIDWriter) upsertStorageCID(tx *sqlx.Tx, storageCID models.StorageNodeModel, stateID int64) error {
@@ -133,5 +125,8 @@ func (in *PostgresCIDWriter) upsertStorageCID(tx *sqlx.Tx, storageCID models.Sto
133125
_, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
134126
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
135127
stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
136-
return err
128+
if err != nil {
129+
return fmt.Errorf("error upserting storage_cids entry: %v", err)
130+
}
131+
return nil
137132
}

statediff/service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -634,11 +634,11 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
634634
receipts = sds.BlockChain.GetReceiptsByHash(block.Hash())
635635
}
636636
tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty)
637-
// defer handling of commit/rollback for any return case
638-
defer tx.Close(err)
639637
if err != nil {
640638
return err
641639
}
640+
// defer handling of commit/rollback for any return case
641+
defer tx.Close(err)
642642
output := func(node StateNode) error {
643643
return sds.indexer.PushStateNode(tx, node)
644644
}

0 commit comments

Comments
 (0)