Skip to content

Commit 0f68228

Browse files
committed
ethevents: improve web3 failure management
- fix data race and make it race free (hopefully) - control over the block number to detect frozen web3 endpoints - clean a bit the code (remove all legacy stuff) - manage web3 endpoints in a better way If a web3 connection fails, then the next one is directly choosen (if any), instead of keeping to reconnect with the one already failed. TODO: apply the same mechanism on the signaling oracle Signed-off-by: p4u <[email protected]>
1 parent 25171f6 commit 0f68228

File tree

8 files changed

+144
-126
lines changed

8 files changed

+144
-126
lines changed

cmd/dvotenode/dvotenode.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,6 @@ func newConfig() (*config.DvoteCfg, config.Error) {
104104
// ethereum node
105105
globalCfg.EthConfig.SigningKey = *flag.StringP("ethSigningKey", "k", "",
106106
"signing private Key (if not specified the Ethereum keystore will be used)")
107-
// ethereum events
108-
globalCfg.EthEventConfig.SubscribeOnly = *flag.Bool("ethSubscribeOnly", true,
109-
"only subscribe to new ethereum events (do not read past log)")
110107
// ethereum web3
111108
globalCfg.W3Config.ChainType = *flag.StringP("ethChain", "c", "goerli",
112109
fmt.Sprintf("Ethereum blockchain to use: %s", ethchain.AvailableChains))
@@ -223,7 +220,6 @@ func newConfig() (*config.DvoteCfg, config.Error) {
223220

224221
// ethereum node
225222
viper.BindPFlag("ethConfig.SigningKey", flag.Lookup("ethSigningKey"))
226-
viper.BindPFlag("ethEventConfig.SubscribeOnly", flag.Lookup("ethSubscribeOnly"))
227223

228224
// ethereum web3
229225
viper.BindPFlag("w3Config.ChainType", flag.Lookup("ethChain"))
@@ -579,20 +575,9 @@ func main() {
579575
}
580576
// Start ethereum events (if at least one web3 endpoint configured)
581577
if len(w3uris) > 0 {
582-
log.Infof("using %+v web3 endpoints", w3uris)
583-
584578
var evh []ethevents.EventHandler
585579
evh = append(evh, ethevents.HandleVochainOracle)
586580

587-
var initBlock int64
588-
if !globalCfg.EthEventConfig.SubscribeOnly {
589-
chainSpecs, err := ethchain.SpecsFor(globalCfg.W3Config.ChainType)
590-
if err != nil {
591-
log.Fatal("cannot get chain block to start looking for events")
592-
}
593-
initBlock = chainSpecs.StartingBlock
594-
}
595-
596581
whiteListedAddr := []string{}
597582
for _, addr := range globalCfg.VochainConfig.EthereumWhiteListAddrs {
598583
if addr == "[]" {
@@ -612,12 +597,9 @@ func main() {
612597
context.Background(),
613598
w3uris,
614599
globalCfg.W3Config.ChainType,
615-
initBlock,
616-
cm,
617600
signer,
618601
vnode,
619602
evh,
620-
sc,
621603
whiteListedAddr); err != nil {
622604
log.Fatal(err)
623605
}

config/config.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ type DvoteCfg struct {
1414
Ipfs *IPFSCfg
1515
// EthConfig ethereum client config options
1616
EthConfig *EthCfg
17-
// EthEventConfig ethereum even subscription config options
18-
EthEventConfig *EthEventCfg
1917
// API api config options
2018
API *API
2119
// Metrics config options
@@ -56,13 +54,12 @@ func (c *DvoteCfg) ValidMode() bool {
5654
// NewGatewayConfig initializes the fields in the gateway config stuct
5755
func NewConfig() *DvoteCfg {
5856
return &DvoteCfg{
59-
W3Config: new(W3Cfg),
60-
VochainConfig: new(VochainCfg),
61-
Ipfs: new(IPFSCfg),
62-
EthConfig: new(EthCfg),
63-
EthEventConfig: new(EthEventCfg),
64-
API: new(API),
65-
Metrics: new(MetricsCfg),
57+
W3Config: new(W3Cfg),
58+
VochainConfig: new(VochainCfg),
59+
Ipfs: new(IPFSCfg),
60+
EthConfig: new(EthCfg),
61+
API: new(API),
62+
Metrics: new(MetricsCfg),
6663
}
6764
}
6865

@@ -120,13 +117,6 @@ type W3Cfg struct {
120117
W3External []string
121118
}
122119

123-
type EthEventCfg struct {
124-
// CensusSync if true census sync will be enabled
125-
CensusSync bool
126-
// SubscribeOnly if true only new received events will be processed, otherwise all events of the current chain will be processed
127-
SubscribeOnly bool
128-
}
129-
130120
// VochainCfg includes all possible config params needed by the Vochain
131121
type VochainCfg struct {
132122
// Chain is the network name to connect with

dockerfiles/dvotenode/env.example

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ DVOTE_DEV=True
2727
#DVOTE_ETHCONFIG_BOOTNODES=
2828
#DVOTE_ETHCONFIG_TRUSTEDPEERS=
2929
#DVOTE_ETHCONFIG_NOWAITSYNC=False
30-
#DVOTE_ETHEVENTCONFIG_CENSUSSYNC=True
31-
#DVOTE_ETHEVENTCONFIG_SUBSCRIBEONLY=False
3230
#DVOTE_W3CONFIG_ROUTE=/web3
3331
#DVOTE_W3CONFIG_ENABLED=False
3432
#DVOTE_W3CONFIG_RPCPORT=9091

ethereum/ethevents/events.go

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@ import (
1414
ethtypes "github.com/ethereum/go-ethereum/core/types"
1515
cttypes "github.com/tendermint/tendermint/rpc/core/types"
1616
ttypes "github.com/tendermint/tendermint/types"
17-
"go.vocdoni.io/dvote/census"
1817
"go.vocdoni.io/dvote/crypto/ethereum"
1918
"go.vocdoni.io/dvote/data"
2019
"go.vocdoni.io/dvote/types"
2120
"go.vocdoni.io/dvote/vochain"
22-
"go.vocdoni.io/dvote/vochain/scrutinizer"
2321
"go.vocdoni.io/proto/build/go/models"
2422

2523
"github.com/ethereum/go-ethereum/ethclient"
@@ -28,7 +26,7 @@ import (
2826
)
2927

3028
const (
31-
readBlocksPast = 100
29+
readBlocksPast = 200
3230
)
3331

3432
var blockConfirmThreshold = map[models.SourceNetworkId]time.Duration{
@@ -42,8 +40,6 @@ var blockConfirmThreshold = map[models.SourceNetworkId]time.Duration{
4240
type EthereumEvents struct {
4341
// contracts handle
4442
VotingHandle *ethereumhandler.EthereumHandler
45-
// dial web3 addresses
46-
DialAddrs []string
4743
// list of handler functions that will be called on events
4844
// TODO: add context on callbacks
4945
// TODO: return errors on callbacks
@@ -52,20 +48,18 @@ type EthereumEvents struct {
5248
Signer *ethereum.SignKeys
5349
// VochainApp is a pointer to the Vochain BaseApplication allowing to call SendTx method
5450
VochainApp *vochain.BaseApplication
55-
// Census is the census manager service
56-
Census CensusManager
5751
// EventProcessor handles events pending to process
5852
EventProcessor *EventProcessor
59-
// Scrutinizer
60-
Scrutinizer *scrutinizer.Scrutinizer
6153
// EthereumWhiteListAddrs
6254
EthereumWhiteListAddrs map[common.Address]bool
6355
// ContractsAddress
6456
ContractsAddress []common.Address
6557
// ContractsInfo holds useful info for working with the desired contracts
6658
ContractsInfo map[string]*ethereumhandler.EthereumContract
6759
// EthereumLastKnownBlock keeps track of the latest Ethereum known block
68-
EthereumLastKnownBlock atomic.Value
60+
EthereumLastKnownBlock uint64
61+
// block confirm threshold for the network
62+
blockConfirmThreshold time.Duration
6963
}
7064

7165
type timedEvent struct {
@@ -105,8 +99,6 @@ func NewEthEvents(
10599
contracts map[string]*ethereumhandler.EthereumContract,
106100
srcNetworkId models.SourceNetworkId,
107101
signer *ethereum.SignKeys,
108-
w3Endpoints []string,
109-
cens *census.Manager,
110102
vocapp *vochain.BaseApplication,
111103
ethereumWhiteList []string,
112104
) (*EthereumEvents, error) {
@@ -136,8 +128,6 @@ func NewEthEvents(
136128
srcNetworkId, confirmThreshold)
137129
ethev := &EthereumEvents{
138130
Signer: signer,
139-
DialAddrs: w3Endpoints,
140-
Census: cens,
141131
VochainApp: vocapp,
142132
EventProcessor: &EventProcessor{
143133
Events: make(chan ethtypes.Log),
@@ -147,6 +137,7 @@ func NewEthEvents(
147137
EthereumWhiteListAddrs: secureAddrList,
148138
ContractsAddress: contractsAddress,
149139
ContractsInfo: contracts,
140+
blockConfirmThreshold: confirmThreshold,
150141
}
151142

152143
return ethev, nil
@@ -167,29 +158,24 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context) {
167158
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
168159
defer cancel()
169160
var err error
170-
uLastBlockNumber, err := ev.VotingHandle.EthereumClient.BlockNumber(tctx)
161+
lastBlockNumber, err := ev.VotingHandle.EthereumClient.BlockNumber(tctx)
171162
if err != nil {
172163
log.Fatalf("cannot get last block number: (%v)", err)
173164
}
174-
lastBlockNumber := int64(uLastBlockNumber)
175-
fromBlock := ev.EthereumLastKnownBlock.Load().(int64)
176-
if fromBlock == 0 {
177-
ev.EthereumLastKnownBlock.Store(lastBlockNumber)
178-
fromBlock = lastBlockNumber
179-
}
165+
atomic.StoreUint64(&ev.EthereumLastKnownBlock, lastBlockNumber)
180166

181167
// For security, even if subscribe only, force to process
182168
// from some past blocks defined by `readBlocksPast`
183-
if err := ev.processEventLogsFromTo(ctx, fromBlock-readBlocksPast,
169+
if err := ev.processEventLogsFromTo(ctx, lastBlockNumber-readBlocksPast,
184170
lastBlockNumber, ev.VotingHandle.EthereumClient); err != nil {
185171
log.Errorf("cannot process event logs: %v", err)
186172
}
187173

188174
// Subscribing from latest known block
189-
log.Infof("subscribing to Ethereum Events from block %d", fromBlock)
175+
log.Infof("subscribing to Ethereum Events from block %d", lastBlockNumber)
190176
query := eth.FilterQuery{
191177
Addresses: ev.ContractsAddress,
192-
FromBlock: big.NewInt(fromBlock),
178+
FromBlock: new(big.Int).SetUint64(lastBlockNumber),
193179
}
194180
logs := make(chan ethtypes.Log, 30) // give it some buffer as recommended by the package library
195181
var sub eth.Subscription
@@ -204,38 +190,44 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context) {
204190
}
205191

206192
// Monitorize ethereum client connection
207-
connFailure := make(chan bool, 1)
208-
go func(chan bool) {
193+
connFailure := make(chan error, 1)
194+
go func(chan error) {
195+
time.Sleep(ev.blockConfirmThreshold)
209196
for {
210-
time.Sleep(time.Second * 5)
211-
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
212-
defer cancel()
213-
block, err := ev.VotingHandle.EthereumClient.BlockNumber(tctx)
214-
if err != nil {
215-
log.Warnf("cannot check ethereum connection while running the event processor, %s", err)
216-
connFailure <- false
197+
select {
198+
case <-ctx.Done():
217199
return
200+
default:
201+
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
202+
block, err := ev.VotingHandle.EthereumClient.BlockNumber(tctx)
203+
cancel()
204+
if err != nil {
205+
connFailure <- err
206+
return
207+
}
208+
if atomic.LoadUint64(&ev.EthereumLastKnownBlock) >= block {
209+
connFailure <- fmt.Errorf("web3 frozen, block have not changed on the last %s",
210+
ev.blockConfirmThreshold)
211+
return
212+
}
213+
atomic.StoreUint64(&ev.EthereumLastKnownBlock, block)
214+
time.Sleep(ev.blockConfirmThreshold)
218215
}
219-
ev.EthereumLastKnownBlock.Store(int64(block))
220216
}
221217
}(connFailure)
222218

223219
// Block forever and start processing events
224220
for {
225221
select {
226-
case <-sub.Err():
222+
case err := <-sub.Err():
227223
ev.EventProcessor.eventProcessorRunning = false
228-
ev.VotingHandle = nil
229-
log.Warn("ethereum events subscription error on channel")
230-
log.Infof("restarting ethereum events subscription with web3: %+v", ev.DialAddrs)
224+
log.Warnf("ethereum events connection error: %v", err)
231225
return
232226
case event := <-logs:
233227
ev.EventProcessor.Events <- event
234-
case <-connFailure:
228+
case err := <-connFailure:
235229
ev.EventProcessor.eventProcessorRunning = false
236-
ev.VotingHandle = nil
237-
log.Warn("ethereum events subscription error on channel")
238-
log.Infof("restarting ethereum events subscription with web3: %+v", ev.DialAddrs)
230+
log.Warnf("ethereum events connection failure: %v", err)
239231
return
240232
}
241233
}
@@ -244,7 +236,7 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context) {
244236
// ReadEthereumEventLogs reads the oracle
245237
// defined smart contract and looks for events.
246238
func (ev *EthereumEvents) processEventLogsFromTo(ctx context.Context,
247-
from, to int64, client *ethclient.Client) error {
239+
from, to uint64, client *ethclient.Client) error {
248240
for name, contract := range ev.ContractsInfo {
249241
if !contract.ListenForEvents {
250242
continue
@@ -254,8 +246,8 @@ func (ev *EthereumEvents) processEventLogsFromTo(ctx context.Context,
254246
}
255247
log.Infof("reading ethereum events from block %d to %d", from, to)
256248
query := eth.FilterQuery{
257-
FromBlock: big.NewInt(from),
258-
ToBlock: big.NewInt(to),
249+
FromBlock: new(big.Int).SetUint64(from),
250+
ToBlock: new(big.Int).SetUint64(to),
259251
Addresses: ev.ContractsAddress,
260252
}
261253

ethereum/ethevents/handlers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ var ethereumEventList = map[string]string{
7171

7272
// Number of blocks to add to the process start block to start as soon as possible the process.
7373
// This start block addition takes into account that can be some delay on the Vochain commit operation.
74-
const ProcessStartBlockDelay = 10
74+
const processStartBlockDelay = 10
7575

7676
// HandleVochainOracle handles the events on ethereum for the Oracle.
7777
func HandleVochainOracle(ctx context.Context, event *ethtypes.Log, e *EthereumEvents) error {
@@ -133,7 +133,7 @@ func HandleVochainOracle(ctx context.Context, event *ethtypes.Log, e *EthereumEv
133133
if processTx.Process.Status == models.ProcessStatus_READY &&
134134
processTx.Process.StartBlock == 1 &&
135135
processTx.Process.Mode.AutoStart {
136-
processTx.Process.StartBlock = e.VochainApp.Height() + ProcessStartBlockDelay
136+
processTx.Process.StartBlock = e.VochainApp.Height() + processStartBlockDelay
137137
}
138138

139139
stx := &models.SignedTx{}

0 commit comments

Comments
 (0)