Skip to content

Commit 25171f6

Browse files
jordipainanp4u
authored andcommitted
ethevents: decouple EthereumHandler from EthEvents
and simplify the event subscription connection logic. EthEvents was calling NewEthereumHandler on the object creation itself and because of this the function usage was less flexible. Now the EthEvents EthereumHandle is added after the object instanciation thus making the EthEvents subscription go-routine simpler. The startBlock attribute was also deleted in favor of EthereumLastKnownBlock which is an atomic value shared among different go routines. The former follows the subscription connection logic simplification. The SubscribeEthereumEventLogs function had a weird/complex logic for reading some past events during the initial connection and now also with the reconnection thanks to the web3 endpoints pool, so the new logic handles the subscription as follows: - If subscribeOnly flag == true: - Get latest block and start from latestBlock - readBlocksPast - If latest block cannot be fetch -> Fatal - If subscribeOnly flag = false - Get start block from the specs and use it as latestBlock - If start block does not exist for the spec -> Fatal In both cases, once the initialization is done, the EthereumLastKnownBlock attribute will keep track of the latest known block of an specific web3 client and if at some point the connection is dropped and the subscription is initialized again, the start block for the subscription is going to be the latest known block which is updated every 5 seconds minus the readBlocks past for adding some extra security. The last change made for having the subscription connection mechanism simplified is to share the same context with cancel with the PrintInfo function because they share the same web3 client. From now on the PrintInfo blocking function is terminated if the subscription connection is not working and reinitialized in the same goroutine that keeps track of the ethereum events subscription.
1 parent 1f9ff25 commit 25171f6

File tree

3 files changed

+52
-143
lines changed

3 files changed

+52
-143
lines changed

ethereum/ethevents/events.go

Lines changed: 39 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"math/big"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
eth "github.com/ethereum/go-ethereum"
@@ -64,7 +65,7 @@ type EthereumEvents struct {
6465
// ContractsInfo holds useful info for working with the desired contracts
6566
ContractsInfo map[string]*ethereumhandler.EthereumContract
6667
// EthereumLastKnownBlock keeps track of the latest Ethereum known block
67-
EthereumLastKnownBlock int64
68+
EthereumLastKnownBlock atomic.Value
6869
}
6970

7071
type timedEvent struct {
@@ -109,15 +110,6 @@ func NewEthEvents(
109110
vocapp *vochain.BaseApplication,
110111
ethereumWhiteList []string,
111112
) (*EthereumEvents, error) {
112-
// try to connect to default addr if w3Endpoint is empty
113-
if len(w3Endpoints) == 0 {
114-
return nil, fmt.Errorf("no w3Endpoint specified on Ethereum Events")
115-
}
116-
ph, err := ethereumhandler.NewEthereumHandler(contracts, srcNetworkId, w3Endpoints)
117-
if err != nil {
118-
return nil, fmt.Errorf("cannot create voting handle: %w", err)
119-
}
120-
ph.WaitSync()
121113
secureAddrList := make(map[common.Address]bool, len(ethereumWhiteList))
122114
if len(ethereumWhiteList) != 0 {
123115
for _, sAddr := range ethereumWhiteList {
@@ -128,7 +120,6 @@ func NewEthEvents(
128120
secureAddrList[addr] = true
129121
}
130122
}
131-
132123
contractsAddress := []common.Address{}
133124
for _, contract := range contracts {
134125
if !bytes.Equal(contract.Address.Bytes(), common.Address{}.Bytes()) {
@@ -137,19 +128,17 @@ func NewEthEvents(
137128
}
138129
}
139130
}
140-
141131
confirmThreshold := blockConfirmThreshold[0]
142132
if _, ok := blockConfirmThreshold[srcNetworkId]; ok {
143133
confirmThreshold = blockConfirmThreshold[srcNetworkId]
144134
}
145135
log.Infof("chain %s found, block confirmation threshold set to %s",
146136
srcNetworkId, confirmThreshold)
147137
ethev := &EthereumEvents{
148-
VotingHandle: ph,
149-
Signer: signer,
150-
DialAddrs: w3Endpoints,
151-
Census: cens,
152-
VochainApp: vocapp,
138+
Signer: signer,
139+
DialAddrs: w3Endpoints,
140+
Census: cens,
141+
VochainApp: vocapp,
153142
EventProcessor: &EventProcessor{
154143
Events: make(chan ethtypes.Log),
155144
EventProcessThreshold: confirmThreshold,
@@ -172,155 +161,81 @@ func (ev *EthereumEvents) AddEventHandler(handler EventHandler) {
172161
// Events are Queued for 60 seconds before processed in order to avoid possible blockchain
173162
// reversions.
174163
// Blocking function (use go routine).
175-
func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context, fromBlock int64) {
176-
var sub eth.Subscription
177-
var err error
164+
func (ev *EthereumEvents) SubscribeEthereumEventLogs(ctx context.Context) {
165+
178166
// Get current block
179167
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
180168
defer cancel()
181-
var lastBlock int64
182-
var blk *ethtypes.Block
183-
errors := 0
184-
for {
185-
if errors > 5 {
186-
log.Fatal("the web3 client connection is not working")
187-
}
188-
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(tctx, nil)
189-
if err != nil {
190-
log.Errorf("cannot get ethereum block: %s", err)
191-
errors++
192-
time.Sleep(time.Second * 2)
193-
if err := ev.VotingHandle.Connect(ev.DialAddrs); err != nil {
194-
log.Warn(err)
195-
}
196-
continue
197-
}
198-
lastBlock = blk.Number().Int64()
199-
break
169+
var err error
170+
uLastBlockNumber, err := ev.VotingHandle.EthereumClient.BlockNumber(tctx)
171+
if err != nil {
172+
log.Fatalf("cannot get last block number: (%v)", err)
200173
}
201-
202-
if fromBlock != 0 {
203-
// do not retry if error processing old logs
204-
// Unless this is the first set of oracles, it is almost
205-
// sure that the events are already processed so do not
206-
// block and do not fatal here
207-
if err := ev.processEventLogsFromTo(ctx, fromBlock, lastBlock,
208-
ev.VotingHandle.EthereumClient); err != nil {
209-
log.Errorf("cannot process event logs: %v", err)
210-
}
211-
// Update block number
212-
// Expect the client to be connected here
213-
// and the call is successful, if not
214-
// block the execution and alert
215-
errors = 0
216-
for {
217-
if errors > 5 {
218-
log.Fatal("the web3 connection is not working")
219-
}
220-
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
221-
defer cancel()
222-
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(tctx, nil)
223-
if err != nil {
224-
log.Errorf("cannot update block number: %v", err)
225-
errors++
226-
time.Sleep(time.Second * 2)
227-
// If any error try close the client and reconnect
228-
if err := ev.VotingHandle.Connect(ev.DialAddrs); err != nil {
229-
log.Warn(err)
230-
}
231-
continue
232-
}
233-
lastBlock = blk.Number().Int64()
234-
break
235-
}
236-
// For security, read also the new passed blocks before subscribing
237-
// Same as the last processEventLogsFromTo function call
238-
if err := ev.processEventLogsFromTo(ctx, lastBlock, blk.Number().Int64(),
239-
ev.VotingHandle.EthereumClient); err != nil {
240-
log.Errorf("cannot process event logs: %s", err)
241-
}
242-
} else {
243-
// For security, even if subscribe only, force to process at least the past `readBlockPast`
244-
// Same as the last processEventLogsFromTo function call
245-
if err := ev.processEventLogsFromTo(ctx, blk.Number().Int64()-readBlocksPast,
246-
blk.Number().Int64(), ev.VotingHandle.EthereumClient); err != nil {
247-
log.Errorf("cannot process event logs: %v", err)
248-
}
174+
lastBlockNumber := int64(uLastBlockNumber)
175+
fromBlock := ev.EthereumLastKnownBlock.Load().(int64)
176+
if fromBlock == 0 {
177+
ev.EthereumLastKnownBlock.Store(lastBlockNumber)
178+
fromBlock = lastBlockNumber
249179
}
250180

251-
tctx, cancel = context.WithTimeout(ctx, types.EthereumReadTimeout*2)
252-
defer cancel()
253-
// And then subscribe to new events
254-
// Update block number
255-
blk, err = ev.VotingHandle.EthereumClient.BlockByNumber(tctx, nil)
256-
if err != nil {
257-
// accept to not update the block number here
258-
// if any error, the old blk fetched will be used instead.
259-
log.Errorf("cannot upate block number: %s", err)
181+
// For security, even if subscribe only, force to process
182+
// from some past blocks defined by `readBlocksPast`
183+
if err := ev.processEventLogsFromTo(ctx, fromBlock-readBlocksPast,
184+
lastBlockNumber, ev.VotingHandle.EthereumClient); err != nil {
185+
log.Errorf("cannot process event logs: %v", err)
260186
}
261-
log.Infof("subscribing to Ethereum Events from block %d", blk.Number().Int64())
187+
188+
// Subscribing from latest known block
189+
log.Infof("subscribing to Ethereum Events from block %d", fromBlock)
262190
query := eth.FilterQuery{
263191
Addresses: ev.ContractsAddress,
264-
FromBlock: blk.Number(),
192+
FromBlock: big.NewInt(fromBlock),
265193
}
266-
267194
logs := make(chan ethtypes.Log, 30) // give it some buffer as recommended by the package library
268-
// block here, since it is not acceptable to
269-
// start the event processor if the client
270-
// cannot be subscribed to the logs.
271-
// Use the same policy as processEventsLogsFromTo
272-
errors = 0
273-
for {
274-
if errors > 5 {
275-
log.Fatal("the web3 connection is not working")
276-
}
277-
sub, err = ev.VotingHandle.EthereumClient.SubscribeFilterLogs(ctx, query, logs)
278-
if err != nil {
279-
log.Errorf("cannot subscribe to ethereum client log: %s", err)
280-
errors++
281-
time.Sleep(time.Second * 2)
282-
if err := ev.VotingHandle.Connect(ev.DialAddrs); err != nil {
283-
log.Warn(err)
284-
}
285-
continue
286-
}
287-
break
195+
var sub eth.Subscription
196+
sub, err = ev.VotingHandle.EthereumClient.SubscribeFilterLogs(ctx, query, logs)
197+
if err != nil {
198+
log.Fatalf("cannot subscribe to ethereum events: (%v)", err)
288199
}
289200

201+
// Run event processor
290202
if !ev.EventProcessor.eventProcessorRunning {
291203
go ev.runEventProcessor(ctx)
292204
}
293205

294-
// check eth connection
206+
// Monitorize ethereum client connection
295207
connFailure := make(chan bool, 1)
296208
go func(chan bool) {
297209
for {
298210
time.Sleep(time.Second * 5)
299211
tctx, cancel := context.WithTimeout(ctx, types.EthereumReadTimeout)
300212
defer cancel()
301-
block, err := ev.VotingHandle.EthereumClient.BlockByNumber(tctx, nil)
213+
block, err := ev.VotingHandle.EthereumClient.BlockNumber(tctx)
302214
if err != nil {
303215
log.Warnf("cannot check ethereum connection while running the event processor, %s", err)
304216
connFailure <- false
305217
return
306218
}
307-
ev.EthereumLastKnownBlock = block.Number().Int64()
219+
ev.EthereumLastKnownBlock.Store(int64(block))
308220
}
309221
}(connFailure)
310222

223+
// Block forever and start processing events
311224
for {
312225
select {
313226
case <-sub.Err():
227+
ev.EventProcessor.eventProcessorRunning = false
228+
ev.VotingHandle = nil
314229
log.Warn("ethereum events subscription error on channel")
315230
log.Infof("restarting ethereum events subscription with web3: %+v", ev.DialAddrs)
316-
ev.EventProcessor.eventProcessorRunning = false
317231
return
318232
case event := <-logs:
319233
ev.EventProcessor.Events <- event
320234
case <-connFailure:
235+
ev.EventProcessor.eventProcessorRunning = false
236+
ev.VotingHandle = nil
321237
log.Warn("ethereum events subscription error on channel")
322238
log.Infof("restarting ethereum events subscription with web3: %+v", ev.DialAddrs)
323-
ev.EventProcessor.eventProcessorRunning = false
324239
return
325240
}
326241
}

ethereum/handler/ethereumHandler.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func NewEthereumHandler(contracts map[string]*EthereumContract, srcNetworkId mod
7171
if err := eh.Connect(dialEndpoints); err != nil {
7272
return nil, err
7373
}
74+
eh.WaitSync()
7475
log.Infof("Using ENS Registry at address: %s", contracts[ContractNameENSregistry].Address.Hex())
7576
ctx, cancel := context.WithTimeout(context.Background(), types.EthereumReadTimeout)
7677
defer cancel()
@@ -82,7 +83,6 @@ func NewEthereumHandler(contracts map[string]*EthereumContract, srcNetworkId mod
8283
log.Errorf("cannot set contract instance: %s", err)
8384
}
8485
}
85-
go eh.PrintInfo(context.Background(), time.Second*20)
8686
return eh, nil
8787
}
8888

@@ -122,7 +122,7 @@ func (eh *EthereumHandler) WaitSync() {
122122
break
123123
}
124124
if err := eh.Connect(eh.Endpoints); err != nil {
125-
log.Warnf("cannot connect to any web3 endpoint: %v", err)
125+
log.Fatalf("cannot connect to any web3 endpoint: %v", err)
126126
}
127127
cancel()
128128
time.Sleep(time.Second * 5)
@@ -161,15 +161,15 @@ func (eh *EthereumHandler) PrintInfo(ctx context.Context, seconds time.Duration)
161161
var err error
162162
var syncingInfo string
163163
for {
164+
if ctx.Err() != nil {
165+
return
166+
}
164167
time.Sleep(seconds)
165168
tctx, cancel := context.WithTimeout(ctx, time.Minute)
166169
info, err = eh.SyncInfo(tctx)
167170
cancel()
168171
if err != nil {
169172
log.Warnf("error getting ethereum info: %s", err)
170-
if err := eh.Connect(eh.Endpoints); err != nil {
171-
log.Fatalf("cannot connect to any web3 endpoint: %v", err)
172-
}
173173
continue
174174
}
175175
if !info.Synced {

service/ethevents.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package service
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"go.vocdoni.io/dvote/census"
89
"go.vocdoni.io/dvote/crypto/ethereum"
@@ -46,7 +47,8 @@ func EthEvents(
4647
vocapp,
4748
ethereumWhiteList,
4849
)
49-
ev.EthereumLastKnownBlock = startBlock
50+
51+
ev.EthereumLastKnownBlock.Store(startBlock)
5052

5153
if err != nil {
5254
return fmt.Errorf("couldn't create ethereum events listener: %w", err)
@@ -61,24 +63,16 @@ func EthEvents(
6163
// Once all the resources are freed the Ethereum handler is initialized
6264
// again as well as the subscription mechanism
6365
go func() {
64-
// TODO: @jordipainan
65-
// Since the NewEthEvents service calls NewEthereumHandler
66-
// the initialized boolean avoids to create the new ethereum hanler
67-
// twice on the first run. NewEthEvents and the handler creation
68-
// should be decoupled.
69-
var initialized bool
7066
for {
7167
if ctx.Err() != nil {
7268
return
7369
}
74-
ctx, cancel := context.WithCancel(ctx)
75-
if initialized {
76-
if ev.VotingHandle, err = ethereumhandler.NewEthereumHandler(ev.ContractsInfo, specs.NetworkSource, w3uris); err != nil {
77-
log.Fatalf("cannot restart ethereum events, cannot create ethereum handler: %v", err)
78-
}
70+
if ev.VotingHandle, err = ethereumhandler.NewEthereumHandler(ev.ContractsInfo, specs.NetworkSource, w3uris); err != nil {
71+
log.Fatalf("cannot restart ethereum events, cannot create ethereum handler: %v", err)
7972
}
80-
ev.SubscribeEthereumEventLogs(ctx, ev.EthereumLastKnownBlock)
81-
initialized = true
73+
ctx, cancel := context.WithCancel(ctx)
74+
go ev.VotingHandle.PrintInfo(ctx, time.Second*20)
75+
ev.SubscribeEthereumEventLogs(ctx)
8276
// stop all child goroutines if error on subscription
8377
cancel()
8478
}

0 commit comments

Comments
 (0)