Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 0310431

Browse files
author
Rob Mulholand
authored
Merge pull request #145 from vulcanize/contract-watcher-logging
Increase logging in contract watcher
2 parents b4e16c4 + 4505382 commit 0310431

File tree

16 files changed

+168
-126
lines changed

16 files changed

+168
-126
lines changed

pkg/contract_watcher/full/transformer/transformer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package transformer
1919
import (
2020
"errors"
2121

22+
"github.com/sirupsen/logrus"
23+
2224
"github.com/vulcanize/vulcanizedb/pkg/config"
2325
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/converter"
2426
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/retriever"
@@ -106,7 +108,11 @@ func (tr *Transformer) Init() error {
106108

107109
// Get contract name if it has one
108110
var name = new(string)
109-
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock)
111+
pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock)
112+
if pollingErr != nil {
113+
// can't return this error because "name" might not exist on the contract
114+
logrus.Warnf("error fetching contract data: %s", pollingErr.Error())
115+
}
110116

111117
// Remove any potential accidental duplicate inputs in arg filter values
112118
eventArgs := map[string]bool{}

pkg/contract_watcher/header/converter/converter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID in
132132

133133
// Convert the given watched event logs into types.Logs; returns a map of event names to a slice of their converted logs
134134
func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) {
135-
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
135+
boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
136136
eventsToLogs := make(map[string][]types.Log)
137137
for _, event := range events {
138138
eventsToLogs[event.Name] = make([]types.Log, 0, len(logs))
@@ -141,7 +141,7 @@ func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
141141
// If the log is of this event type, process it as such
142142
if event.Sig() == log.Topics[0] {
143143
values := make(map[string]interface{})
144-
err := contract.UnpackLogIntoMap(values, event.Name, log)
144+
err := boundContract.UnpackLogIntoMap(values, event.Name, log)
145145
if err != nil {
146146
return nil, err
147147
}

pkg/contract_watcher/header/repository/header_repository.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/hashicorp/golang-lru"
2323
"github.com/jmoiron/sqlx"
24+
"github.com/sirupsen/logrus"
2425

2526
"github.com/vulcanize/vulcanizedb/pkg/core"
2627
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
@@ -148,7 +149,10 @@ func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids [
148149
pgStr = pgStr[:len(pgStr)-2]
149150
_, err = tx.Exec(pgStr, header.Id)
150151
if err != nil {
151-
tx.Rollback()
152+
rollbackErr := tx.Rollback()
153+
if rollbackErr != nil {
154+
logrus.Warnf("error rolling back transaction: %s", rollbackErr.Error())
155+
}
152156
return err
153157
}
154158
}
@@ -246,6 +250,7 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock
246250
// Returns a continuous set of headers
247251
func continuousHeaders(headers []core.Header) []core.Header {
248252
if len(headers) < 1 {
253+
logrus.Trace("no headers to arrange continuously")
249254
return headers
250255
}
251256
previousHeader := headers[0].BlockNumber

pkg/contract_watcher/header/repository/header_repository_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ var _ = Describe("Repository", func() {
137137
h1 := missingHeaders[0]
138138
h2 := missingHeaders[1]
139139
h3 := missingHeaders[2]
140-
Expect(h1.BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
141-
Expect(h2.BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
142-
Expect(h3.BlockNumber).To(Equal(int64(mocks.MockHeader3.BlockNumber)))
140+
Expect(h1.BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
141+
Expect(h2.BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
142+
Expect(h3.BlockNumber).To(Equal(mocks.MockHeader3.BlockNumber))
143143
})
144144

145145
It("Returns only contiguous chunks of headers", func() {
@@ -150,8 +150,8 @@ var _ = Describe("Repository", func() {
150150
missingHeaders, err := contractHeaderRepo.MissingHeaders(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs[0])
151151
Expect(err).ToNot(HaveOccurred())
152152
Expect(len(missingHeaders)).To(Equal(2))
153-
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
154-
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
153+
Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
154+
Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
155155
})
156156

157157
It("Fails if eventID does not yet exist in check_headers table", func() {
@@ -199,8 +199,8 @@ var _ = Describe("Repository", func() {
199199
missingHeaders, err := contractHeaderRepo.MissingHeadersForAll(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs)
200200
Expect(err).ToNot(HaveOccurred())
201201
Expect(len(missingHeaders)).To(Equal(2))
202-
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
203-
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
202+
Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
203+
Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
204204
})
205205

206206
It("returns headers after starting header if starting header not missing", func() {

pkg/contract_watcher/header/retriever/block_retriever_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ var _ = Describe("Block Retriever", func() {
4444

4545
Describe("RetrieveFirstBlock", func() {
4646
It("Retrieves block number of earliest header in the database", func() {
47-
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
48-
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
49-
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
47+
_, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
48+
Expect(err).ToNot(HaveOccurred())
49+
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
50+
Expect(err).ToNot(HaveOccurred())
51+
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
52+
Expect(err).ToNot(HaveOccurred())
5053

5154
i, err := r.RetrieveFirstBlock()
5255
Expect(err).NotTo(HaveOccurred())
@@ -61,9 +64,12 @@ var _ = Describe("Block Retriever", func() {
6164

6265
Describe("RetrieveMostRecentBlock", func() {
6366
It("Retrieves the latest header's block number", func() {
64-
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
65-
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
66-
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
67+
_, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
68+
Expect(err).ToNot(HaveOccurred())
69+
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
70+
Expect(err).ToNot(HaveOccurred())
71+
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
72+
Expect(err).ToNot(HaveOccurred())
6773

6874
i, err := r.RetrieveMostRecentBlock()
6975
Expect(err).ToNot(HaveOccurred())

pkg/contract_watcher/header/transformer/transformer.go

Lines changed: 56 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ package transformer
1818

1919
import (
2020
"errors"
21+
"fmt"
2122
"strings"
2223

2324
"github.com/ethereum/go-ethereum/common"
2425
gethTypes "github.com/ethereum/go-ethereum/core/types"
26+
"github.com/sirupsen/logrus"
2527

2628
"github.com/vulcanize/vulcanizedb/pkg/config"
2729
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/converter"
@@ -107,22 +109,22 @@ func (tr *Transformer) Init() error {
107109
// Configure Abi
108110
if tr.Config.Abis[contractAddr] == "" {
109111
// If no abi is given in the config, this method will try fetching from internal look-up table and etherscan
110-
err := tr.Parser.Parse(contractAddr)
111-
if err != nil {
112-
return err
112+
parseErr := tr.Parser.Parse(contractAddr)
113+
if parseErr != nil {
114+
return fmt.Errorf("error parsing contract by address: %s", parseErr.Error())
113115
}
114116
} else {
115117
// If we have an abi from the config, load that into the parser
116-
err := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr])
117-
if err != nil {
118-
return err
118+
parseErr := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr])
119+
if parseErr != nil {
120+
return fmt.Errorf("error parsing contract abi: %s", parseErr.Error())
119121
}
120122
}
121123

122124
// Get first block and most recent block number in the header repo
123-
firstBlock, err := tr.Retriever.RetrieveFirstBlock()
124-
if err != nil {
125-
return err
125+
firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock()
126+
if retrieveErr != nil {
127+
return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error())
126128
}
127129

128130
// Set to specified range if it falls within the bounds
@@ -132,7 +134,11 @@ func (tr *Transformer) Init() error {
132134

133135
// Get contract name if it has one
134136
var name = new(string)
135-
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
137+
pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
138+
if pollingErr != nil {
139+
// can't return this error because "name" might not exist on the contract
140+
logrus.Warnf("error fetching contract data: %s", pollingErr.Error())
141+
}
136142

137143
// Remove any potential accidental duplicate inputs
138144
eventArgs := map[string]bool{}
@@ -165,9 +171,9 @@ func (tr *Transformer) Init() error {
165171
tr.sortedEventIds[con.Address] = make([]string, 0, len(con.Events))
166172
for _, event := range con.Events {
167173
eventId := strings.ToLower(event.Name + "_" + con.Address)
168-
err := tr.HeaderRepository.AddCheckColumn(eventId)
169-
if err != nil {
170-
return err
174+
addColumnErr := tr.HeaderRepository.AddCheckColumn(eventId)
175+
if addColumnErr != nil {
176+
return fmt.Errorf("error adding check column: %s", addColumnErr.Error())
171177
}
172178
// Keep track of this event id; sorted and unsorted
173179
tr.sortedEventIds[con.Address] = append(tr.sortedEventIds[con.Address], eventId)
@@ -180,9 +186,9 @@ func (tr *Transformer) Init() error {
180186
tr.sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods))
181187
for _, m := range con.Methods {
182188
methodId := strings.ToLower(m.Name + "_" + con.Address)
183-
err := tr.HeaderRepository.AddCheckColumn(methodId)
184-
if err != nil {
185-
return err
189+
addColumnErr := tr.HeaderRepository.AddCheckColumn(methodId)
190+
if addColumnErr != nil {
191+
return fmt.Errorf("error adding check column: %s", addColumnErr.Error())
186192
}
187193
tr.sortedMethodIds[con.Address] = append(tr.sortedMethodIds[con.Address], methodId)
188194
}
@@ -202,9 +208,9 @@ func (tr *Transformer) Execute() error {
202208
}
203209

204210
// Find unchecked headers for all events across all contracts; these are returned in asc order
205-
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
206-
if err != nil {
207-
return err
211+
missingHeaders, missingHeadersErr := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
212+
if missingHeadersErr != nil {
213+
return fmt.Errorf("error getting missing headers: %s", missingHeadersErr.Error())
208214
}
209215

210216
// Iterate over headers
@@ -216,23 +222,24 @@ func (tr *Transformer) Execute() error {
216222
// Map to sort batch fetched logs by which contract they belong to, for post fetch processing
217223
sortedLogs := make(map[string][]gethTypes.Log)
218224
// And fetch all event logs across contracts at this header
219-
allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
220-
if err != nil {
221-
return err
225+
allLogs, fetchErr := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
226+
if fetchErr != nil {
227+
return fmt.Errorf("error fetching logs: %s", fetchErr.Error())
222228
}
223229

224230
// If no logs are found mark the header checked for all of these eventIDs
225231
// and continue to method polling and onto the next iteration
226232
if len(allLogs) < 1 {
227-
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
228-
if err != nil {
229-
return err
233+
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
234+
if markCheckedErr != nil {
235+
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
230236
}
231-
err = tr.methodPolling(header, tr.sortedMethodIds)
232-
if err != nil {
233-
return err
237+
pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
238+
if pollingErr != nil {
239+
return fmt.Errorf("error polling methods: %s", pollingErr.Error())
234240
}
235241
tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header
242+
logrus.Tracef("no logs found for block %d, continuing", header.BlockNumber)
236243
continue
237244
}
238245

@@ -245,41 +252,43 @@ func (tr *Transformer) Execute() error {
245252
// Process logs for each contract
246253
for conAddr, logs := range sortedLogs {
247254
if logs == nil {
255+
logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber)
248256
continue
249257
}
250258
// Configure converter with this contract
251259
con := tr.Contracts[conAddr]
252260
tr.Converter.Update(con)
253261

254262
// Convert logs into batches of log mappings (eventName => []types.Logs
255-
convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
256-
if err != nil {
257-
return err
263+
convertedLogs, convertErr := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
264+
if convertErr != nil {
265+
return fmt.Errorf("error converting logs: %s", convertErr.Error())
258266
}
259267
// Cycle through each type of event log and persist them
260268
for eventName, logs := range convertedLogs {
261269
// If logs for this event are empty, mark them checked at this header and continue
262270
if len(logs) < 1 {
263271
eventId := strings.ToLower(eventName + "_" + con.Address)
264-
err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
265-
if err != nil {
266-
return err
272+
markCheckedErr := tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
273+
if markCheckedErr != nil {
274+
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
267275
}
276+
logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber)
268277
continue
269278
}
270279
// If logs aren't empty, persist them
271280
// Header is marked checked in the transactions
272-
err = tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
273-
if err != nil {
274-
return err
281+
persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
282+
if persistErr != nil {
283+
return fmt.Errorf("error persisting logs: %s", persistErr.Error())
275284
}
276285
}
277286
}
278287

279288
// Poll contracts at this block height
280-
err = tr.methodPolling(header, tr.sortedMethodIds)
281-
if err != nil {
282-
return err
289+
pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
290+
if pollingErr != nil {
291+
return fmt.Errorf("error polling methods: %s", pollingErr.Error())
283292
}
284293
// Success; setup to start at the next header
285294
tr.Start = header.BlockNumber + 1
@@ -294,19 +303,20 @@ func (tr *Transformer) methodPolling(header core.Header, sortedMethodIds map[str
294303
// Skip method polling processes if no methods are specified
295304
// Also don't try to poll methods below this contract's specified starting block
296305
if len(con.Methods) == 0 || header.BlockNumber < con.StartingBlock {
306+
logrus.Tracef("not polling contract: %s", con.Address)
297307
continue
298308
}
299309

300310
// Poll all methods for this contract at this header
301-
err := tr.Poller.PollContractAt(*con, header.BlockNumber)
302-
if err != nil {
303-
return err
311+
pollingErr := tr.Poller.PollContractAt(*con, header.BlockNumber)
312+
if pollingErr != nil {
313+
return fmt.Errorf("error polling contract %s: %s", con.Address, pollingErr.Error())
304314
}
305315

306316
// Mark this header checked for the methods
307-
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
308-
if err != nil {
309-
return err
317+
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
318+
if markCheckedErr != nil {
319+
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
310320
}
311321
}
312322

pkg/contract_watcher/header/transformer/transformer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ var _ = Describe("Transformer", func() {
6868
err := t.Init()
6969

7070
Expect(err).To(HaveOccurred())
71-
Expect(err).To(MatchError(fakes.FakeError))
71+
Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error()))
7272
})
7373
})
7474

@@ -109,7 +109,7 @@ var _ = Describe("Transformer", func() {
109109
err := t.Init()
110110

111111
Expect(err).To(HaveOccurred())
112-
Expect(err).To(MatchError(fakes.FakeError))
112+
Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error()))
113113
})
114114
})
115115
})

pkg/contract_watcher/shared/constants/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
var SupportsInterfaceABI = `[{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}]`
2525

2626
// Individual event interfaces for constructing ABI from
27-
var SupportsInterace = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}`
27+
var SupportsInterface = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}`
2828
var AddrChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"a","type":"address"}],"name":"AddrChanged","type":"event"}`
2929
var ContentChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"hash","type":"bytes32"}],"name":"ContentChanged","type":"event"}`
3030
var NameChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"name","type":"string"}],"name":"NameChanged","type":"event"}`

pkg/contract_watcher/shared/fetcher/fetcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func newFetcherError(err error, fetchMethod string) *fetcherError {
4747

4848
// Fetcher struct
4949
type Fetcher struct {
50-
BlockChain core.BlockChain // Underyling Blockchain
50+
BlockChain core.BlockChain // Underlying Blockchain
5151
}
5252

5353
// Fetcher error

pkg/contract_watcher/shared/helpers/test_helpers/database.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ func SetupDBandBC() (*postgres.DB, core.BlockChain) {
115115
rpcClient := client.NewRpcClient(rawRpcClient, infuraIPC)
116116
ethClient := ethclient.NewClient(rawRpcClient)
117117
blockChainClient := client.NewEthClient(ethClient)
118-
node := node.MakeNode(rpcClient)
118+
madeNode := node.MakeNode(rpcClient)
119119
transactionConverter := rpc2.NewRpcTransactionConverter(ethClient)
120-
blockChain := geth.NewBlockChain(blockChainClient, rpcClient, node, transactionConverter)
120+
blockChain := geth.NewBlockChain(blockChainClient, rpcClient, madeNode, transactionConverter)
121121

122122
db, err := postgres.NewDB(config.Database{
123123
Hostname: "localhost",

0 commit comments

Comments
 (0)