@@ -25,15 +25,15 @@ import (
2525 node "github.com/ipfs/go-ipld-format"
2626
2727 "github.com/ethereum/go-ethereum/common"
28- "github.com/ethereum/go-ethereum/log"
2928 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
3029 "github.com/ethereum/go-ethereum/statediff/indexer/models"
3130 nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
3231)
3332
3433var (
3534 nullHash = common .HexToHash ("0x0000000000000000000000000000000000000000000000000000000000000000" )
36- collatedStmtSize = 65336 // min(linuxPipeSize, macOSPipeSize)
35+ pipeSize = 65336 // min(linuxPipeSize, macOSPipeSize)
36+ collatedStmtSize = pipeSize * 16
3737)
3838
3939// SQLWriter writes sql statements to a file
@@ -43,18 +43,22 @@ type SQLWriter struct {
4343 collatedStmt []byte
4444 collationIndex int
4545
46- quitChan chan struct {}
47- doneChan chan struct {}
46+ flushChan chan struct {}
47+ flushFinished chan struct {}
48+ quitChan chan struct {}
49+ doneChan chan struct {}
4850}
4951
5052// NewSQLWriter creates a new pointer to a Writer
5153func NewSQLWriter (file * os.File ) * SQLWriter {
5254 return & SQLWriter {
53- file : file ,
54- stmts : make (chan []byte ),
55- collatedStmt : make ([]byte , collatedStmtSize ),
56- quitChan : make (chan struct {}),
57- doneChan : make (chan struct {}),
55+ file : file ,
56+ stmts : make (chan []byte ),
57+ collatedStmt : make ([]byte , collatedStmtSize ),
58+ flushChan : make (chan struct {}),
59+ flushFinished : make (chan struct {}),
60+ quitChan : make (chan struct {}),
61+ doneChan : make (chan struct {}),
5862 }
5963}
6064
@@ -72,16 +76,21 @@ func (sqw *SQLWriter) Loop() {
7276 l = len (stmt )
7377 if l + sqw .collationIndex + 1 > collatedStmtSize {
7478 if err := sqw .flush (); err != nil {
75- log . Error ("error writing cached sql stmts to file" , "err" , err )
79+ panic ( fmt . Sprintf ("error writing sql stmts buffer to file: %v" , err ) )
7680 }
7781 }
78- copy (sqw .collatedStmt [sqw .collationIndex :sqw .collationIndex + l - 1 ], stmt )
82+ copy (sqw .collatedStmt [sqw .collationIndex :sqw .collationIndex + l ], stmt )
7983 sqw .collationIndex += l
8084 case <- sqw .quitChan :
8185 if err := sqw .flush (); err != nil {
82- log . Error ("error writing cached sql stmts to file" , "err" , err )
86+ panic ( fmt . Sprintf ("error writing sql stmts buffer to file: %v" , err ) )
8387 }
8488 return
89+ case <- sqw .flushChan :
90+ if err := sqw .flush (); err != nil {
91+ panic (fmt .Sprintf ("error writing sql stmts buffer to file: %v" , err ))
92+ }
93+ sqw .flushFinished <- struct {}{}
8594 }
8695 }
8796 }()
@@ -94,55 +103,58 @@ func (sqw *SQLWriter) Close() error {
94103 return nil
95104}
96105
106+ // Flush sends a flush signal to the looping process
107+ func (sqw * SQLWriter ) Flush () {
108+ sqw .flushChan <- struct {}{}
109+ <- sqw .flushFinished
110+ }
111+
97112func (sqw * SQLWriter ) flush () error {
98- if _ , err := sqw .file .Write (sqw .collatedStmt [0 : sqw .collationIndex - 1 ]); err != nil {
113+ if _ , err := sqw .file .Write (sqw .collatedStmt [0 : sqw .collationIndex ]); err != nil {
99114 return err
100115 }
101116 sqw .collationIndex = 0
102117 return nil
103118}
104119
105120const (
106- nodeInsert = ` INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES (%s, %s, %s, %s, %d)
107- ON CONFLICT (node_id) DO NOTHING ;\n`
121+ nodeInsert = " INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " +
122+ "('%s', '%s', '%s', '%s', %d) ;\n "
108123
109- ipldInsert = ` INSERT INTO public.blocks (key, data) VALUES (%s, %x) ON CONFLICT (key) DO NOTHING ;\n`
124+ ipldInsert = " INSERT INTO public.blocks (key, data) VALUES ('%s', '%x') ;\n "
110125
111- headerInsert = ` INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
112- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d)
113- ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s , %d, %s, eth.header_cids.times_validated + 1 , %d);\n`
126+ headerInsert = " INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
127+ "state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
128+ "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x' , %d, '%s', %d , %d);\n "
114129
115- headerInsertWithoutBaseFee = ` INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
116- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL)
117- ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s , %d, %s, eth.header_cids.times_validated + 1 , NULL);\n`
130+ headerInsertWithoutBaseFee = " INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " +
131+ "reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
132+ "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x' , %d, '%s', %d , NULL);\n "
118133
119- uncleInsert = ` INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (%s, %s, %s, %s, %s, %s)
120- ON CONFLICT (block_hash) DO NOTHING ;\n`
134+ uncleInsert = " INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
135+ "('%s', '%s', '%s', '%s', '%s', '%s') ;\n "
121136
122- txInsert = ` INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %d)
123- ON CONFLICT (tx_hash) DO NOTHING ;\n`
137+ txInsert = " INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " +
138+ "VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%x', %d) ;\n "
124139
125- alInsert = `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES (%s, %d, %s, %s)
126- ON CONFLICT (tx_id, index) DO NOTHING;\n`
140+ alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n "
127141
128- rctInsert = ` INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (%s, %s, %s, %s, %s, %s, %d, %s)
129- ON CONFLICT (tx_id) DO NOTHING ;\n`
142+ rctInsert = " INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " +
143+ "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s') ;\n "
130144
131- logInsert = ` INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES (%s, %s, %s, %s, %d, %s, %s, %s, %s, %s)
132- ON CONFLICT (rct_id, index) DO NOTHING ;\n`
145+ logInsert = " INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " +
146+ "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '%x') ;\n "
133147
134- stateInsert = ` INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %d, %t, %s)
135- ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (%s, %s , %d, %t, %s );\n`
148+ stateInsert = " INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " +
149+ "VALUES ('%s', '%s', '%s', '%x' , %d, %t, '%s' );\n "
136150
137- accountInsert = ` INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES (%s, %s, %s, %d, %s, %s)
138- ON CONFLICT (header_id, state_path) DO NOTHING ;\n`
151+ accountInsert = " INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " +
152+ "VALUES ('%s', '%x', '%s', %d, '%x', '%s') ;\n "
139153
140- storageInsert = ` INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %s, %d, %t, %s)
141- ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (%s, %s , %d, %t, %s );\n`
154+ storageInsert = " INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " +
155+ "node_type, diff, mh_key) VALUES ('%s', '%x', '%s', '%s', '%x' , %d, %t, '%s' );\n "
142156)
143157
144- // ON CONFLICT (node_id) DO UPDATE SET genesis_block = %s, network_id = %s, client_name = %s, chain_id = %s;\n`
145-
146158func (sqw * SQLWriter ) upsertNode (node nodeinfo.Info ) {
147159 sqw .stmts <- []byte (fmt .Sprintf (nodeInsert , node .GenesisBlock , node .NetworkID , node .ID , node .ClientName , node .ChainID ))
148160}
@@ -183,15 +195,11 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
183195 if header .BaseFee == nil {
184196 stmt = fmt .Sprintf (headerInsertWithoutBaseFee , header .BlockNumber , header .BlockHash , header .ParentHash , header .CID ,
185197 header .TotalDifficulty , header .NodeID , header .Reward , header .StateRoot , header .TxRoot ,
186- header .RctRoot , header .UncleRoot , header .Bloom , header .Timestamp , header .MhKey , 1 ,
187- header .ParentHash , header .CID , header .TotalDifficulty , header .NodeID , header .Reward , header .StateRoot ,
188- header .TxRoot , header .RctRoot , header .UncleRoot , header .Bloom , header .Timestamp , header .MhKey )
198+ header .RctRoot , header .UncleRoot , header .Bloom , header .Timestamp , header .MhKey , 1 )
189199 } else {
190200 stmt = fmt .Sprintf (headerInsert , header .BlockNumber , header .BlockHash , header .ParentHash , header .CID ,
191201 header .TotalDifficulty , header .NodeID , header .Reward , header .StateRoot , header .TxRoot ,
192- header .RctRoot , header .UncleRoot , header .Bloom , header .Timestamp , header .MhKey , 1 , header .BaseFee ,
193- header .ParentHash , header .CID , header .TotalDifficulty , header .NodeID , header .Reward , header .StateRoot ,
194- header .TxRoot , header .RctRoot , header .UncleRoot , header .Bloom , header .Timestamp , header .MhKey , header .BaseFee )
202+ header .RctRoot , header .UncleRoot , header .Bloom , header .Timestamp , header .MhKey , 1 , header .BaseFee )
195203 }
196204 sqw .stmts <- []byte (stmt )
197205 indexerMetrics .blocks .Inc (1 )
@@ -228,8 +236,8 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
228236 if stateNode .StateKey != nullHash .String () {
229237 stateKey = stateNode .StateKey
230238 }
231- sqw .stmts <- []byte (fmt .Sprintf (stateInsert , stateNode .HeaderID , stateKey , stateNode .CID , stateNode .Path , stateNode . NodeType ,
232- true , stateNode . MhKey , stateKey , stateNode . CID , stateNode .NodeType , true , stateNode .MhKey ))
239+ sqw .stmts <- []byte (fmt .Sprintf (stateInsert , stateNode .HeaderID , stateKey , stateNode .CID , stateNode .Path ,
240+ stateNode .NodeType , true , stateNode .MhKey ))
233241}
234242
235243func (sqw * SQLWriter ) upsertStateAccount (stateAccount models.StateAccountModel ) {
@@ -243,6 +251,5 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
243251 storageKey = storageCID .StorageKey
244252 }
245253 sqw .stmts <- []byte (fmt .Sprintf (storageInsert , storageCID .HeaderID , storageCID .StatePath , storageKey , storageCID .CID ,
246- storageCID .Path , storageCID .NodeType , true , storageCID .MhKey , storageKey , storageCID .CID , storageCID .NodeType ,
247- true , storageCID .MhKey ))
254+ storageCID .Path , storageCID .NodeType , true , storageCID .MhKey ))
248255}
0 commit comments