Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit b4e16c4

Browse files
Merge pull request #116 from vulcanize/storage-diffs-over-rpc
Storage diffs over rpc
2 parents 20ce0ab + f0d2741 commit b4e16c4

37 files changed

+1127
-417
lines changed

cmd/composeAndExecute.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
package cmd
1818

1919
import (
20+
"github.com/ethereum/go-ethereum/statediff"
21+
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
22+
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
23+
"github.com/vulcanize/vulcanizedb/pkg/fs"
2024
"os"
2125
"plugin"
2226
syn "sync"
@@ -25,9 +29,7 @@ import (
2529
log "github.com/sirupsen/logrus"
2630
"github.com/spf13/cobra"
2731

28-
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
2932
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
30-
"github.com/vulcanize/vulcanizedb/pkg/fs"
3133
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
3234
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
3335
"github.com/vulcanize/vulcanizedb/utils"
@@ -179,12 +181,26 @@ func composeAndExecute() {
179181
}
180182

181183
if len(ethStorageInitializers) > 0 {
182-
tailer := fs.FileTailer{Path: storageDiffsPath}
183-
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
184-
sw := watcher.NewStorageWatcher(storageFetcher, &db)
185-
sw.AddTransformers(ethStorageInitializers)
186-
wg.Add(1)
187-
go watchEthStorage(&sw, &wg)
184+
switch storageDiffsSource {
185+
case "geth":
186+
log.Debug("fetching storage diffs from geth pub sub")
187+
rpcClient, _ := getClients()
188+
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
189+
payloadChan := make(chan statediff.Payload)
190+
storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan)
191+
sw := watcher.NewStorageWatcher(storageFetcher, &db)
192+
sw.AddTransformers(ethStorageInitializers)
193+
wg.Add(1)
194+
go watchEthStorage(&sw, &wg)
195+
default:
196+
log.Debug("fetching storage diffs from csv")
197+
tailer := fs.FileTailer{Path: storageDiffsPath}
198+
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
199+
sw := watcher.NewStorageWatcher(storageFetcher, &db)
200+
sw.AddTransformers(ethStorageInitializers)
201+
wg.Add(1)
202+
go watchEthStorage(&sw, &wg)
203+
}
188204
}
189205

190206
if len(ethContractInitializers) > 0 {

cmd/execute.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ package cmd
1818

1919
import (
2020
"fmt"
21+
"github.com/ethereum/go-ethereum/statediff"
22+
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
23+
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
24+
"github.com/vulcanize/vulcanizedb/pkg/fs"
2125
"plugin"
2226
syn "sync"
2327
"time"
@@ -26,11 +30,9 @@ import (
2630
"github.com/spf13/cobra"
2731

2832
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
29-
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
3033
storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
3134
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
3235
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
33-
"github.com/vulcanize/vulcanizedb/pkg/fs"
3436
"github.com/vulcanize/vulcanizedb/utils"
3537
)
3638

@@ -123,12 +125,26 @@ func execute() {
123125
}
124126

125127
if len(ethStorageInitializers) > 0 {
126-
tailer := fs.FileTailer{Path: storageDiffsPath}
127-
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
128-
sw := watcher.NewStorageWatcher(storageFetcher, &db)
129-
sw.AddTransformers(ethStorageInitializers)
130-
wg.Add(1)
131-
go watchEthStorage(&sw, &wg)
128+
switch storageDiffsSource {
129+
case "geth":
130+
log.Debug("fetching storage diffs from geth pub sub")
131+
rpcClient, _ := getClients()
132+
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
133+
payloadChan := make(chan statediff.Payload)
134+
storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan)
135+
sw := watcher.NewStorageWatcher(storageFetcher, &db)
136+
sw.AddTransformers(ethStorageInitializers)
137+
wg.Add(1)
138+
go watchEthStorage(&sw, &wg)
139+
default:
140+
log.Debug("fetching storage diffs from csv")
141+
tailer := fs.FileTailer{Path: storageDiffsPath}
142+
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
143+
sw := watcher.NewStorageWatcher(storageFetcher, &db)
144+
sw.AddTransformers(ethStorageInitializers)
145+
wg.Add(1)
146+
go watchEthStorage(&sw, &wg)
147+
}
132148
}
133149

134150
if len(ethContractInitializers) > 0 {
@@ -166,16 +182,16 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
166182
}
167183
}
168184

169-
func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
185+
func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
170186
defer wg.Done()
171187
// Execute over the StorageTransformerInitializer set using the storage watcher
172188
LogWithCommand.Info("executing storage transformers")
173189
ticker := time.NewTicker(pollingInterval)
174190
defer ticker.Stop()
175191
for range ticker.C {
176192
errs := make(chan error)
177-
rows := make(chan storageUtils.StorageDiffRow)
178-
w.Execute(rows, errs, queueRecheckInterval)
193+
diffs := make(chan storageUtils.StorageDiff)
194+
w.Execute(diffs, errs, queueRecheckInterval)
179195
}
180196
}
181197

cmd/root.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var (
4848
recheckHeadersArg bool
4949
SubCommand string
5050
LogWithCommand log.Entry
51+
storageDiffsSource string
5152
)
5253

5354
const (
@@ -80,6 +81,7 @@ func setViperConfigs() {
8081
ipc = viper.GetString("client.ipcpath")
8182
levelDbPath = viper.GetString("client.leveldbpath")
8283
storageDiffsPath = viper.GetString("filesystem.storageDiffsPath")
84+
storageDiffsSource = viper.GetString("storageDiffs.source")
8385
databaseConfig = config.Database{
8486
Name: viper.GetString("database.name"),
8587
Hostname: viper.GetString("database.hostname"),
@@ -118,6 +120,7 @@ func init() {
118120
rootCmd.PersistentFlags().String("client-ipcPath", "", "location of geth.ipc file")
119121
rootCmd.PersistentFlags().String("client-levelDbPath", "", "location of levelDb chaindata")
120122
rootCmd.PersistentFlags().String("filesystem-storageDiffsPath", "", "location of storage diffs csv file")
123+
rootCmd.PersistentFlags().String("storageDiffs-source", "csv", "where to get the state diffs: csv or geth")
121124
rootCmd.PersistentFlags().String("exporter-name", "exporter", "name of exporter plugin")
122125
rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic")
123126

@@ -129,6 +132,7 @@ func init() {
129132
viper.BindPFlag("client.ipcPath", rootCmd.PersistentFlags().Lookup("client-ipcPath"))
130133
viper.BindPFlag("client.levelDbPath", rootCmd.PersistentFlags().Lookup("client-levelDbPath"))
131134
viper.BindPFlag("filesystem.storageDiffsPath", rootCmd.PersistentFlags().Lookup("filesystem-storageDiffsPath"))
135+
viper.BindPFlag("storageDiffs.source", rootCmd.PersistentFlags().Lookup("storageDiffs-source"))
132136
viper.BindPFlag("exporter.fileName", rootCmd.PersistentFlags().Lookup("exporter-name"))
133137
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
134138
}
@@ -152,15 +156,21 @@ func initConfig() {
152156
}
153157

154158
func getBlockChain() *geth.BlockChain {
159+
rpcClient, ethClient := getClients()
160+
vdbEthClient := client.NewEthClient(ethClient)
161+
vdbNode := node.MakeNode(rpcClient)
162+
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
163+
return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
164+
}
165+
166+
func getClients() (client.RpcClient, *ethclient.Client) {
155167
rawRpcClient, err := rpc.Dial(ipc)
156168

157169
if err != nil {
158170
LogWithCommand.Fatal(err)
159171
}
160172
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
161173
ethClient := ethclient.NewClient(rawRpcClient)
162-
vdbEthClient := client.NewEthClient(ethClient)
163-
vdbNode := node.MakeNode(rpcClient)
164-
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
165-
return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
174+
175+
return rpcClient, ethClient
166176
}

go.mod

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ require (
1212
github.com/ethereum/go-ethereum v1.9.5
1313
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
1414
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect
15+
github.com/go-sql-driver/mysql v1.4.1 // indirect
16+
github.com/golang/protobuf v1.3.2 // indirect
1517
github.com/gorilla/websocket v1.4.1 // indirect
16-
github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6 // indirect
17-
github.com/hashicorp/golang-lru v0.5.1
18+
github.com/hashicorp/golang-lru v0.5.3
19+
github.com/howeyc/fsnotify v0.9.0 // indirect
1820
github.com/hpcloud/tail v1.0.0
1921
github.com/huin/goupnp v1.0.0 // indirect
2022
github.com/jackpal/go-nat-pmp v1.0.1 // indirect
@@ -42,8 +44,14 @@ require (
4244
github.com/syndtr/goleveldb v1.0.0 // indirect
4345
github.com/tyler-smith/go-bip39 v1.0.2 // indirect
4446
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect
47+
golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794 // indirect
4548
golang.org/x/net v0.0.0-20190603091049-60506f45cf65
4649
golang.org/x/sync v0.0.0-20190423024810-112230192c58
4750
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff // indirect
4851
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
52+
gopkg.in/urfave/cli.v1 v1.0.0-00010101000000-000000000000 // indirect
4953
)
54+
55+
replace github.com/ethereum/go-ethereum => github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101
56+
57+
replace gopkg.in/urfave/cli.v1 => gopkg.in/urfave/cli.v1 v1.20.0

go.sum

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6/go.mod h1
9191
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
9292
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
9393
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
94+
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
95+
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
9496
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
9597
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
9698
github.com/howeyc/fsnotify v0.9.0 h1:0gtV5JmOKH4A8SsFxG2BczSeXWWPvcMT0euZt5gDAxY=
@@ -241,6 +243,10 @@ github.com/tyler-smith/go-bip39 v1.0.0/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2
241243
github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1OlOT8=
242244
github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
243245
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
246+
github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101 h1:fsHhBzscAwi4u7/F033SFJwTIz+46D8uDWMu2/ZdvzA=
247+
github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101 h1:fsHhBzscAwi4u7/F033SFJwTIz+46D8uDWMu2/ZdvzA=
248+
github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101/go.mod h1:9i0pGnKDUFFr8yC/n8xyrNBVfhYlpwE8J3Ge6ThKvug=
249+
github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101/go.mod h1:9i0pGnKDUFFr8yC/n8xyrNBVfhYlpwE8J3Ge6ThKvug=
244250
github.com/vulcanize/vulcanizedb v0.0.5/go.mod h1:utXkheCL9VjTfmuivuvRiAAyHh54GSN9XRQNEbFCA8k=
245251
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 h1:1cngl9mPEoITZG8s8cVcUy5CeIBYhEESkOB7m6Gmkrk=
246252
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees=
@@ -252,6 +258,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol
252258
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
253259
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
254260
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
261+
golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794 h1:4Yo9XtTfxfBCecLiBW8TYsFIdN7TkDhjGLWetFo4JSo=
262+
golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
255263
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
256264
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
257265
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U=
@@ -281,6 +289,8 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aW
281289
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
282290
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
283291
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
292+
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69 h1:rOhMmluY6kLMhdnrivzec6lLgaVbMHMn2ISQXJeJ5EM=
293+
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
284294
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
285295
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
286296
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
@@ -300,6 +310,7 @@ gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff h1:uuol9OUzSv
300310
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns=
301311
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
302312
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
313+
gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0=
303314
gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0=
304315
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
305316
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

libraries/shared/factories/storage/transformer.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,16 @@ package storage
1818

1919
import (
2020
"github.com/ethereum/go-ethereum/common"
21-
2221
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
2322
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
2423
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
2524
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
2625
)
2726

2827
type Transformer struct {
29-
Address common.Address
30-
Mappings storage.Mappings
31-
Repository Repository
28+
HashedAddress common.Hash
29+
Mappings storage.Mappings
30+
Repository Repository
3231
}
3332

3433
func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.StorageTransformer {
@@ -37,18 +36,18 @@ func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.Stora
3736
return transformer
3837
}
3938

40-
func (transformer Transformer) ContractAddress() common.Address {
41-
return transformer.Address
39+
func (transformer Transformer) KeccakContractAddress() common.Hash {
40+
return transformer.HashedAddress
4241
}
4342

44-
func (transformer Transformer) Execute(row utils.StorageDiffRow) error {
45-
metadata, lookupErr := transformer.Mappings.Lookup(row.StorageKey)
43+
func (transformer Transformer) Execute(diff utils.StorageDiff) error {
44+
metadata, lookupErr := transformer.Mappings.Lookup(diff.StorageKey)
4645
if lookupErr != nil {
4746
return lookupErr
4847
}
49-
value, decodeErr := utils.Decode(row, metadata)
48+
value, decodeErr := utils.Decode(diff, metadata)
5049
if decodeErr != nil {
5150
return decodeErr
5251
}
53-
return transformer.Repository.Create(row.BlockHeight, row.BlockHash.Hex(), metadata, value)
52+
return transformer.Repository.Create(diff.BlockHeight, diff.BlockHash.Hex(), metadata, value)
5453
}

0 commit comments

Comments
 (0)