Skip to content

Commit aea4d67

Browse files
committed
all: use WaigGroup.Go() to simplify code
1 parent b49f6cb commit aea4d67

35 files changed

+112
-227
lines changed

accounts/keystore/keystore_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,14 +406,12 @@ func TestImportRace(t *testing.T) {
406406
_, ks2 := tmpKeyStore(t)
407407
var atom atomic.Uint32
408408
var wg sync.WaitGroup
409-
wg.Add(2)
410409
for i := 0; i < 2; i++ {
411-
go func() {
412-
defer wg.Done()
410+
wg.Go(func() {
413411
if _, err := ks2.Import(json, "new", "new"); err != nil {
414412
atom.Add(1)
415413
}
416-
}()
414+
})
417415
}
418416
wg.Wait()
419417
if atom.Load() != 1 {

bmt/bmt_test.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -263,23 +263,21 @@ func TestHasherConcurrency(t *testing.T) {
263263
defer pool.Drain(0)
264264
wg := sync.WaitGroup{}
265265
cycles := 100
266-
wg.Add(maxproccnt * cycles)
267266
errc := make(chan error)
268267

269268
for p := 0; p < maxproccnt; p++ {
270269
for i := 0; i < cycles; i++ {
271-
go func() {
270+
wg.Go(func() {
272271
bmt := New(pool)
273272
n := rand.Intn(4096)
274273
tdata := testDataReader(n)
275274
data := make([]byte, n)
276275
tdata.Read(data)
277276
err := testHasherCorrectness(bmt, hasher, data, n, 128)
278-
wg.Done()
279277
if err != nil {
280278
errc <- err
281279
}
282-
}()
280+
})
283281
}
284282
}
285283
go func() {
@@ -386,18 +384,16 @@ func benchmarkBMTBaseline(n int, t *testing.B) {
386384
for i := 0; i < t.N; i++ {
387385
count := int32((n-1)/hasher().Size() + 1)
388386
wg := sync.WaitGroup{}
389-
wg.Add(maxproccnt)
390387
var i int32
391388
for j := 0; j < maxproccnt; j++ {
392-
go func() {
393-
defer wg.Done()
389+
wg.Go(func() {
394390
h := hasher()
395391
for atomic.AddInt32(&i, 1) < count {
396392
h.Reset()
397393
h.Write(data)
398394
h.Sum(nil)
399395
}
400-
}()
396+
})
401397
}
402398
wg.Wait()
403399
}
@@ -437,15 +433,13 @@ func benchmarkHasherReuse(poolsize, n int, t *testing.B) {
437433
t.ResetTimer()
438434
for i := 0; i < t.N; i++ {
439435
wg := sync.WaitGroup{}
440-
wg.Add(cycles)
441436
for j := 0; j < cycles; j++ {
442-
bmt := New(pool)
443-
go func() {
444-
defer wg.Done()
437+
wg.Go(func() {
438+
bmt := New(pool)
445439
bmt.Reset()
446440
bmt.Write(data)
447441
bmt.Sum(nil)
448-
}()
442+
})
449443
}
450444
wg.Wait()
451445
}

common/prque/lazyqueue_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ func TestLazyQueue(t *testing.T) {
7979
stopCh = make(chan chan struct{})
8080
)
8181
defer wg.Wait()
82-
wg.Add(1)
83-
go func() {
84-
defer wg.Done()
82+
wg.Go(func() {
8583
for {
8684
select {
8785
case <-clock.After(testQueueRefresh):
@@ -92,7 +90,7 @@ func TestLazyQueue(t *testing.T) {
9290
return
9391
}
9492
}
95-
}()
93+
})
9694

9795
for c := 0; c < testSteps; c++ {
9896
i := rand.Intn(testItems)

consensus/XDPoS/engines/engine_v2/engine.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -750,16 +750,14 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert *
750750
start := time.Now()
751751

752752
var wg sync.WaitGroup
753-
wg.Add(len(signatures))
754753
sigErrChan := make(chan error, len(signatures))
755754

756755
for _, signature := range signatures {
757-
go func(sig types.Signature) {
758-
defer wg.Done()
756+
wg.Go(func() {
759757
verified, _, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{
760758
ProposedBlockInfo: quorumCert.ProposedBlockInfo,
761759
GapNumber: quorumCert.GapNumber,
762-
}), sig, epochInfo.Masternodes)
760+
}), signature, epochInfo.Masternodes)
763761
if err != nil {
764762
log.Error("[verifyQC] Error while verfying QC message signatures", "Error", err)
765763
sigErrChan <- errors.New("error while verfying QC message signatures")
@@ -770,7 +768,7 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert *
770768
sigErrChan <- errors.New("fail to verify QC due to signature mis-match")
771769
return
772770
}
773-
}(signature)
771+
})
774772
}
775773
wg.Wait()
776774
elapsed := time.Since(start)

consensus/XDPoS/engines/engine_v2/syncInfo.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,11 @@ func (x *XDPoS_v2) processSyncInfoPool(chain consensus.ChainReader) {
128128

129129
func (x *XDPoS_v2) verifySignatures(messageHash common.Hash, signatures []types.Signature, candidates []common.Address) error {
130130
var wg sync.WaitGroup
131-
wg.Add(len(signatures))
132131
var haveError error
133132

134133
for _, signature := range signatures {
135-
go func(sig types.Signature) {
136-
defer wg.Done()
137-
verified, _, err := x.verifyMsgSignature(messageHash, sig, candidates)
134+
wg.Go(func() {
135+
verified, _, err := x.verifyMsgSignature(messageHash, signature, candidates)
138136
if err != nil {
139137
log.Error("[verifySignatures] Error while verfying QC message signatures", "error", err)
140138
haveError = errors.New("error while verfying QC message signatures")
@@ -145,7 +143,7 @@ func (x *XDPoS_v2) verifySignatures(messageHash common.Hash, signatures []types.
145143
haveError = errors.New("fail to verify QC due to signature mismatch")
146144
return
147145
}
148-
}(signature)
146+
})
149147
}
150148
wg.Wait()
151149
if haveError != nil {

consensus/XDPoS/engines/engine_v2/timeout.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,6 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
185185
}
186186

187187
var wg sync.WaitGroup
188-
wg.Add(len(signatures))
189-
190188
var mutex sync.Mutex
191189
var haveError error
192190

@@ -196,11 +194,10 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
196194
})
197195

198196
for _, signature := range signatures {
199-
go func(sig types.Signature) {
200-
defer wg.Done()
201-
verified, _, err := x.verifyMsgSignature(signedTimeoutObj, sig, snap.NextEpochCandidates)
197+
wg.Go(func() {
198+
verified, _, err := x.verifyMsgSignature(signedTimeoutObj, signature, snap.NextEpochCandidates)
202199
if err != nil || !verified {
203-
log.Error("[verifyTC] Error or verification failure", "signature", sig, "error", err)
200+
log.Error("[verifyTC] Error or verification failure", "signature", signature, "error", err)
204201
mutex.Lock() // Lock before accessing haveError
205202
if haveError == nil {
206203
if err != nil {
@@ -213,7 +210,7 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
213210
}
214211
mutex.Unlock() // Unlock after modifying haveError
215212
}
216-
}(signature)
213+
})
217214
}
218215
wg.Wait()
219216
if haveError != nil {

consensus/XDPoS/engines/engine_v2/vote.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,9 @@ func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Has
163163
emptySigner := common.Address{}
164164
// Filter out non-Master nodes signatures
165165
var wg sync.WaitGroup
166-
wg.Add(len(votes))
167-
for h, vote := range votes {
168-
go func(hash common.Hash, v *types.Vote) {
169-
defer wg.Done()
166+
for _, vote := range votes {
167+
wg.Go(func() {
168+
v := vote.(*types.Vote)
170169
signerAddress := v.GetSigner()
171170
if signerAddress != emptySigner {
172171
// verify that signer belongs to the final masternodes, we have not do so in previous steps
@@ -198,7 +197,7 @@ func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Has
198197
return
199198
}
200199
v.SetSigner(masterNode)
201-
}(h, vote.(*types.Vote))
200+
})
202201
}
203202
wg.Wait()
204203
elapsed := time.Since(start)

console/console.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ func New(config Config) (*Console, error) {
119119
return nil, err
120120
}
121121

122-
console.wg.Add(1)
123-
go console.interruptHandler()
122+
console.wg.Go(console.interruptHandler)
124123

125124
return console, nil
126125
}
@@ -341,8 +340,6 @@ func (c *Console) Evaluate(statement string) {
341340
// interruptHandler runs in its own goroutine and waits for signals.
342341
// When a signal is received, it interrupts the JS interpreter.
343342
func (c *Console) interruptHandler() {
344-
defer c.wg.Done()
345-
346343
// During Interactive, liner inhibits the signal while it is prompting for
347344
// input. However, the signal will be received while evaluating JS.
348345
//

core/blockchain.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
326326
}
327327

328328
// Start future block processor.
329-
bc.wg.Add(1)
330-
go bc.futureBlocksLoop()
329+
bc.wg.Go(bc.futureBlocksLoop)
331330

332331
return bc, nil
333332
}
@@ -2557,8 +2556,6 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
25572556

25582557
// futureBlocksLoop processes the 'future block' queue.
25592558
func (bc *BlockChain) futureBlocksLoop() {
2560-
defer bc.wg.Done()
2561-
25622559
futureTimer := time.NewTicker(10 * time.Millisecond)
25632560
defer futureTimer.Stop()
25642561
for {

core/bloombits/scheduler.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []by
6262
pend := make(chan uint64, cap(dist))
6363

6464
// Start the pipeline schedulers to forward between user -> distributor -> user
65-
wg.Add(2)
66-
go s.scheduleRequests(sections, dist, pend, quit, wg)
67-
go s.scheduleDeliveries(pend, done, quit, wg)
65+
wg.Go(func() { s.scheduleRequests(sections, dist, pend, quit) })
66+
wg.Go(func() { s.scheduleDeliveries(pend, done, quit) })
6867
}
6968

7069
// reset cleans up any leftovers from previous runs. This is required before a
@@ -84,9 +83,8 @@ func (s *scheduler) reset() {
8483
// scheduleRequests reads section retrieval requests from the input channel,
8584
// deduplicates the stream and pushes unique retrieval tasks into the distribution
8685
// channel for a database or network layer to honour.
87-
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
86+
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}) {
8887
// Clean up the goroutine and pipeline when done
89-
defer wg.Done()
9088
defer close(pend)
9189

9290
// Keep reading and scheduling section requests
@@ -131,9 +129,8 @@ func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend
131129

132130
// scheduleDeliveries reads section acceptance notifications and waits for them
133131
// to be delivered, pushing them into the output data buffer.
134-
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
132+
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}) {
135133
// Clean up the goroutine and pipeline when done
136-
defer wg.Done()
137134
defer close(done)
138135

139136
// Keep reading notifications and scheduling deliveries

0 commit comments

Comments
 (0)