Skip to content

Commit 1f9ff25

Browse files
jordipainanp4u
authored andcommitted
oracle: add oracle web3 endpoint pool
If an Oracle that is subscribed to one or more ethereum smart contract events have any problem with the endpoint that is connected with, after some retries, another web3 endpoint will be used. If there is not any web3 endpoint working the application will terminate. When a new connection is created the ethevents service will be restarted as well as the PrintInfo go routine. A context.WithCancel is being used to terminate all the goroutines created by the ethereum events service and the contracts will be resolved on the ENS an instanciated again. That way if any event is lost during the time that there are connection issues, will be catch again on restart.
1 parent 5b292fc commit 1f9ff25

File tree

6 files changed

+179
-97
lines changed

6 files changed

+179
-97
lines changed

cmd/dvotenode/dvotenode.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func newConfig() (*config.DvoteCfg, config.Error) {
110110
// ethereum web3
111111
globalCfg.W3Config.ChainType = *flag.StringP("ethChain", "c", "goerli",
112112
fmt.Sprintf("Ethereum blockchain to use: %s", ethchain.AvailableChains))
113-
globalCfg.W3Config.W3External = *flag.StringP("w3External", "w", "",
113+
globalCfg.W3Config.W3External = *flag.StringArrayP("w3External", "w", []string{},
114114
"ethereum web3 endpoint. Supported protocols: http(s)://, ws(s):// and IPC filepath")
115115
// ipfs
116116
globalCfg.Ipfs.NoInit = *flag.Bool("ipfsNoInit", false,
@@ -561,45 +561,56 @@ func main() {
561561
go kk.RevealUnpublished()
562562
}
563563

564-
// Start ethereum events (if web3 endpoint configured)
565-
if globalCfg.W3Config.W3External != "" {
566-
var evh []ethevents.EventHandler
567-
var w3uri string
568-
switch {
569-
case strings.HasPrefix(globalCfg.W3Config.W3External, "ws"):
570-
w3uri = globalCfg.W3Config.W3External
571-
case strings.HasSuffix(globalCfg.W3Config.W3External, "ipc"):
572-
w3uri = globalCfg.W3Config.W3External
573-
default:
574-
log.Fatal("web3 external must be websocket or IPC for event subscription")
564+
var w3uris []string
565+
if globalCfg.W3Config.W3External != nil {
566+
for idx, web3Endpoint := range globalCfg.W3Config.W3External {
567+
web3EndpointTrimmed := strings.Trim(web3Endpoint, `"[]`)
568+
log.Debugf("web3endpoint %d: %s", idx, web3EndpointTrimmed)
569+
switch {
570+
case strings.HasPrefix(web3EndpointTrimmed, "ws"):
571+
w3uris = append(w3uris, web3EndpointTrimmed)
572+
case strings.HasSuffix(web3EndpointTrimmed, "ipc"):
573+
w3uris = append(w3uris, web3EndpointTrimmed)
574+
default:
575+
log.Warnf(`invalid web3 endpoint %s must be websocket or IPC
576+
for event subscription`, web3EndpointTrimmed)
577+
}
575578
}
579+
}
580+
// Start ethereum events (if at least one web3 endpoint configured)
581+
if len(w3uris) > 0 {
582+
log.Infof("using %+v web3 endpoints", w3uris)
576583

584+
var evh []ethevents.EventHandler
577585
evh = append(evh, ethevents.HandleVochainOracle)
578586

579-
var initBlock *int64
587+
var initBlock int64
580588
if !globalCfg.EthEventConfig.SubscribeOnly {
581-
initBlock = new(int64)
582589
chainSpecs, err := ethchain.SpecsFor(globalCfg.W3Config.ChainType)
583590
if err != nil {
584-
log.Warn("cannot get chain block to start looking for events, using 0")
585-
*initBlock = 0
586-
} else {
587-
*initBlock = chainSpecs.StartingBlock
591+
log.Fatal("cannot get chain block to start looking for events")
588592
}
593+
initBlock = chainSpecs.StartingBlock
589594
}
590595

591596
whiteListedAddr := []string{}
592597
for _, addr := range globalCfg.VochainConfig.EthereumWhiteListAddrs {
598+
if addr == "[]" {
599+
// avoid empty address and pflag autofill in case of empty array
600+
continue
601+
}
593602
if ethcommon.IsHexAddress(addr) {
594603
whiteListedAddr = append(whiteListedAddr, addr)
604+
} else {
605+
log.Warnf("whitelist address %s is not a valid hex address", addr)
595606
}
596607
}
597608
if len(whiteListedAddr) > 0 {
598609
log.Infof("ethereum whitelisted addresses %+v", whiteListedAddr)
599610
}
600611
if err := service.EthEvents(
601612
context.Background(),
602-
w3uri,
613+
w3uris,
603614
globalCfg.W3Config.ChainType,
604615
initBlock,
605616
cm,
@@ -633,8 +644,7 @@ func main() {
633644
if err != nil {
634645
log.Fatal(err)
635646
}
636-
if err := apior.EnableERC20(globalCfg.W3Config.ChainType,
637-
globalCfg.W3Config.W3External); err != nil {
647+
if err := apior.EnableERC20(globalCfg.W3Config.ChainType, globalCfg.W3Config.W3External); err != nil {
638648
log.Fatal(err)
639649
}
640650
}

config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ type EthCfg struct {
116116
type W3Cfg struct {
117117
// ChainType chain to connect with
118118
ChainType string
119-
// W3External URL of an external ethereum node to connect with
120-
W3External string
119+
// W3External URLs of an external ethereum nodes to connect with
120+
W3External []string
121121
}
122122

123123
type EthEventCfg struct {

ethereum/ethevents/events.go

Lines changed: 75 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ var blockConfirmThreshold = map[models.SourceNetworkId]time.Duration{
4141
type EthereumEvents struct {
4242
// contracts handle
4343
VotingHandle *ethereumhandler.EthereumHandler
44-
// dial web3 address
45-
DialAddr string
44+
// dial web3 addresses
45+
DialAddrs []string
4646
// list of handler functions that will be called on events
4747
// TODO: add context on callbacks
4848
// TODO: return errors on callbacks
@@ -63,6 +63,8 @@ type EthereumEvents struct {
6363
ContractsAddress []common.Address
6464
// ContractsInfo holds useful info for working with the desired contracts
6565
ContractsInfo map[string]*ethereumhandler.EthereumContract
66+
// EthereumLastKnownBlock keeps track of the latest Ethereum known block
67+
EthereumLastKnownBlock int64
6668
}
6769

6870
type timedEvent struct {
@@ -102,21 +104,20 @@ func NewEthEvents(
102104
contracts map[string]*ethereumhandler.EthereumContract,
103105
srcNetworkId models.SourceNetworkId,
104106
signer *ethereum.SignKeys,
105-
w3Endpoint string,
107+
w3Endpoints []string,
106108
cens *census.Manager,
107109
vocapp *vochain.BaseApplication,
108110
ethereumWhiteList []string,
109111
) (*EthereumEvents, error) {
110112
// try to connect to default addr if w3Endpoint is empty
111-
if len(w3Endpoint) == 0 {
113+
if len(w3Endpoints) == 0 {
112114
return nil, fmt.Errorf("no w3Endpoint specified on Ethereum Events")
113115
}
114-
ph, err := ethereumhandler.NewEthereumHandler(contracts, srcNetworkId, w3Endpoint)
116+
ph, err := ethereumhandler.NewEthereumHandler(contracts, srcNetworkId, w3Endpoints)
115117
if err != nil {
116118
return nil, fmt.Errorf("cannot create voting handle: %w", err)
117119
}
118120
ph.WaitSync()
119-
go ph.PrintInfo(context.Background(), time.Second*60)
120121
secureAddrList := make(map[common.Address]bool, len(ethereumWhiteList))
121122
if len(ethereumWhiteList) != 0 {
122123
for _, sAddr := range ethereumWhiteList {
@@ -146,7 +147,7 @@ func NewEthEvents(
146147
ethev := &EthereumEvents{
147148
VotingHandle: ph,
148149
Signer: signer,
149-
DialAddr: w3Endpoint,
150+
DialAddrs: w3Endpoints,
150151
Census: cens,
151152
VochainApp: vocapp,
152153
EventProcessor: &EventProcessor{
@@ -169,14 +170,13 @@ func (ev *EthereumEvents) AddEventHandler(handler EventHandler) {
169170

170171
// SubscribeEthereumEventLogs enables the subscription of Ethereum events for new blocks.
171172
// Events are Queued for 60 seconds before processed in order to avoid possible blockchain
172-
// reversions. If fromBlock nil, subscription will start on current block.
173+
// reversions.
173174
// Blocking function (use go routine).
174-
func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBlock *int64) {
175+
func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBlock int64) {
175176
var sub eth.Subscription
176177
var err error
177-
178178
// Get current block
179-
blockTctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout*2)
179+
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
180180
defer cancel()
181181
var lastBlock int64
182182
var blk *ethtypes.Block
@@ -185,12 +185,12 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBl
185185
if errors > 5 {
186186
log.Fatal("the web3 client connection is not working")
187187
}
188-
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(blockTctx, nil)
188+
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(tctx, nil)
189189
if err != nil {
190190
log.Errorf("cannot get ethereum block: %s", err)
191191
errors++
192192
time.Sleep(time.Second * 2)
193-
if err := ev.VotingHandle.Connect(ev.DialAddr); err != nil {
193+
if err := ev.VotingHandle.Connect(ev.DialAddrs); err != nil {
194194
log.Warn(err)
195195
}
196196
continue
@@ -199,13 +199,12 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBl
199199
break
200200
}
201201

202-
// If fromBlock not nil, process past events
203-
if fromBlock != nil {
202+
if fromBlock != 0 {
204203
// do not retry if error processing old logs
205204
// Unless this is the first set of oracles, it is almost
206205
// sure that the events are already processed so do not
207206
// block and do not fatal here
208-
if err := ev.processEventLogsFromTo(ctx, *fromBlock, lastBlock,
207+
if err := ev.processEventLogsFromTo(ctx, fromBlock, lastBlock,
209208
ev.VotingHandle.EthereumClient); err != nil {
210209
log.Errorf("cannot process event logs: %v", err)
211210
}
@@ -218,13 +217,15 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBl
218217
if errors > 5 {
219218
log.Fatal("the web3 connection is not working")
220219
}
221-
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(blockTctx, nil)
220+
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
221+
defer cancel()
222+
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(tctx, nil)
222223
if err != nil {
223224
log.Errorf("cannot update block number: %v", err)
224225
errors++
225226
time.Sleep(time.Second * 2)
226227
// If any error try close the client and reconnect
227-
if err := ev.VotingHandle.Connect(ev.DialAddr); err != nil {
228+
if err := ev.VotingHandle.Connect(ev.DialAddrs); err != nil {
228229
log.Warn(err)
229230
}
230231
continue
@@ -247,11 +248,11 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBl
247248
}
248249
}
249250

250-
blockTctx, cancel = context.WithTimeout(ctx, types.EthereumReadTimeout*2)
251+
tctx, cancel = context.WithTimeout(ctx, types.EthereumReadTimeout*2)
251252
defer cancel()
252253
// And then subscribe to new events
253254
// Update block number
254-
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(blockTctx, nil)
255+
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(tctx, nil)
255256
if err != nil {
256257
// accept to not update the block number here
257258
// if any error, the old blk fetched will be used instead.
@@ -278,7 +279,7 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBl
278279
log.Errorf("cannot subscribe to ethereum client log: %s", err)
279280
errors++
280281
time.Sleep(time.Second * 2)
281-
if err := ev.VotingHandle.Connect(ev.DialAddr); err != nil {
282+
if err := ev.VotingHandle.Connect(ev.DialAddrs); err != nil {
282283
log.Warn(err)
283284
}
284285
continue
@@ -290,13 +291,37 @@ func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBl
290291
go ev.runEventProcessor(ctx)
291292
}
292293

294+
// check eth connection
295+
connFailure := make(chan bool, 1)
296+
go func(chan bool) {
297+
for {
298+
time.Sleep(time.Second * 5)
299+
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
300+
defer cancel()
301+
block, err := ev.VotingHandle.EthereumClient.BlockByNumber(tctx, nil)
302+
if err != nil {
303+
log.Warnf("cannot check ethereum connection while running the event processor, %s", err)
304+
connFailure <- false
305+
return
306+
}
307+
ev.EthereumLastKnownBlock = block.Number().Int64()
308+
}
309+
}(connFailure)
310+
293311
for {
294312
select {
295-
case err := <-sub.Err():
296-
// TODO: @jordipainan do not fatal here, handle error on event subscription channel
297-
log.Fatalf("ethereum events subscription error on channel: %s", err)
313+
case <-sub.Err():
314+
log.Warn("ethereum events subscription error on channel")
315+
log.Infof("restarting ethereum events subscription with web3: %+v", ev.DialAddrs)
316+
ev.EventProcessor.eventProcessorRunning = false
317+
return
298318
case event := <-logs:
299319
ev.EventProcessor.Events <- event
320+
case <-connFailure:
321+
log.Warn("ethereum events subscription error on channel")
322+
log.Infof("restarting ethereum events subscription with web3: %+v", ev.DialAddrs)
323+
ev.EventProcessor.eventProcessorRunning = false
324+
return
300325
}
301326
}
302327
}
@@ -360,7 +385,7 @@ func (ep *EventProcessor) del(event *ethtypes.Log) {
360385
delete(ep.eventQueue, eventID)
361386
}
362387

363-
// next returns the first log event rady to be processed
388+
// next returns the first log event ready to be processed
364389
func (ep *EventProcessor) next() *ethtypes.Log {
365390
ep.eventQueueLock.Lock()
366391
defer ep.eventQueueLock.Unlock()
@@ -375,20 +400,28 @@ func (ep *EventProcessor) next() *ethtypes.Log {
375400

376401
func (ev *EthereumEvents) runEventProcessor(ctx context.Context) {
377402
ev.EventProcessor.eventProcessorRunning = true
403+
log.Info("starting event processor routine")
378404
go func() {
379405
for {
380-
event := <-ev.EventProcessor.Events
381-
evtJSON, err := event.MarshalJSON()
382-
if err != nil {
383-
log.Error(err)
384-
continue
385-
}
386-
if event.Removed {
387-
log.Warnf("removing reversed log event: %s", evtJSON)
388-
ev.EventProcessor.del(&event)
389-
} else {
390-
log.Debugf("queued event log: %s", evtJSON)
391-
ev.EventProcessor.add(&event)
406+
select {
407+
case <-ctx.Done():
408+
return
409+
case event := <-ev.EventProcessor.Events:
410+
evtJSON, err := event.MarshalJSON()
411+
if err != nil {
412+
log.Error(err)
413+
continue
414+
}
415+
if event.Removed {
416+
log.Warnf("removing reversed log event: %s", evtJSON)
417+
ev.EventProcessor.del(&event)
418+
} else {
419+
if event.BlockNumber == 0 {
420+
log.Fatalf("unexpected event block number 0", event)
421+
}
422+
log.Debugf("queued event log: %s", evtJSON)
423+
ev.EventProcessor.add(&event)
424+
}
392425
}
393426
}
394427
}()
@@ -397,7 +430,11 @@ func (ev *EthereumEvents) runEventProcessor(ctx context.Context) {
397430
// TODO(mvdan): Perhaps refactor this so we don't need a sleep.
398431
// Maybe use a queue-like structure sorted by added time,
399432
// and separately mark the removed events to ignore them.
400-
time.Sleep(2 * time.Second)
433+
select {
434+
case <-ctx.Done():
435+
return
436+
case <-time.After(2 * time.Second):
437+
}
401438
if tev := ev.EventProcessor.next(); tev != nil {
402439
for i, handler := range ev.EventHandlers {
403440
log.Infof("executing event handler %d for %s", i, ev.EventProcessor.id(tev))

0 commit comments

Comments
 (0)