Skip to content

Commit 85f1a37

Browse files
committed
all: use WaigGroup.Go() to simplify code
1 parent 28a4f25 commit 85f1a37

35 files changed

+110
-226
lines changed

accounts/keystore/keystore_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,14 +406,12 @@ func TestImportRace(t *testing.T) {
406406
_, ks2 := tmpKeyStore(t)
407407
var atom atomic.Uint32
408408
var wg sync.WaitGroup
409-
wg.Add(2)
410409
for i := 0; i < 2; i++ {
411-
go func() {
412-
defer wg.Done()
410+
wg.Go(func() {
413411
if _, err := ks2.Import(json, "new", "new"); err != nil {
414412
atom.Add(1)
415413
}
416-
}()
414+
})
417415
}
418416
wg.Wait()
419417
if atom.Load() != 1 {

bmt/bmt_test.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -263,23 +263,21 @@ func TestHasherConcurrency(t *testing.T) {
263263
defer pool.Drain(0)
264264
wg := sync.WaitGroup{}
265265
cycles := 100
266-
wg.Add(maxproccnt * cycles)
267266
errc := make(chan error)
268267

269268
for p := 0; p < maxproccnt; p++ {
270269
for i := 0; i < cycles; i++ {
271-
go func() {
270+
wg.Go(func() {
272271
bmt := New(pool)
273272
n := rand.Intn(4096)
274273
tdata := testDataReader(n)
275274
data := make([]byte, n)
276275
tdata.Read(data)
277276
err := testHasherCorrectness(bmt, hasher, data, n, 128)
278-
wg.Done()
279277
if err != nil {
280278
errc <- err
281279
}
282-
}()
280+
})
283281
}
284282
}
285283
go func() {
@@ -386,18 +384,16 @@ func benchmarkBMTBaseline(n int, t *testing.B) {
386384
for i := 0; i < t.N; i++ {
387385
count := int32((n-1)/hasher().Size() + 1)
388386
wg := sync.WaitGroup{}
389-
wg.Add(maxproccnt)
390387
var i int32
391388
for j := 0; j < maxproccnt; j++ {
392-
go func() {
393-
defer wg.Done()
389+
wg.Go(func() {
394390
h := hasher()
395391
for atomic.AddInt32(&i, 1) < count {
396392
h.Reset()
397393
h.Write(data)
398394
h.Sum(nil)
399395
}
400-
}()
396+
})
401397
}
402398
wg.Wait()
403399
}
@@ -437,15 +433,13 @@ func benchmarkHasherReuse(poolsize, n int, t *testing.B) {
437433
t.ResetTimer()
438434
for i := 0; i < t.N; i++ {
439435
wg := sync.WaitGroup{}
440-
wg.Add(cycles)
441436
for j := 0; j < cycles; j++ {
442-
bmt := New(pool)
443-
go func() {
444-
defer wg.Done()
437+
wg.Go(func() {
438+
bmt := New(pool)
445439
bmt.Reset()
446440
bmt.Write(data)
447441
bmt.Sum(nil)
448-
}()
442+
})
449443
}
450444
wg.Wait()
451445
}

common/prque/lazyqueue_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ func TestLazyQueue(t *testing.T) {
7979
stopCh = make(chan chan struct{})
8080
)
8181
defer wg.Wait()
82-
wg.Add(1)
83-
go func() {
84-
defer wg.Done()
82+
wg.Go(func() {
8583
for {
8684
select {
8785
case <-clock.After(testQueueRefresh):
@@ -92,7 +90,7 @@ func TestLazyQueue(t *testing.T) {
9290
return
9391
}
9492
}
95-
}()
93+
})
9694

9795
for c := 0; c < testSteps; c++ {
9896
i := rand.Intn(testItems)

consensus/XDPoS/engines/engine_v2/engine.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -757,16 +757,14 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert *
757757
start := time.Now()
758758

759759
var wg sync.WaitGroup
760-
wg.Add(len(signatures))
761760
sigErrChan := make(chan error, len(signatures))
762761

763762
for _, signature := range signatures {
764-
go func(sig types.Signature) {
765-
defer wg.Done()
763+
wg.Go(func() {
766764
verified, _, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{
767765
ProposedBlockInfo: quorumCert.ProposedBlockInfo,
768766
GapNumber: quorumCert.GapNumber,
769-
}), sig, epochInfo.Masternodes)
767+
}), signature, epochInfo.Masternodes)
770768
if err != nil {
771769
log.Error("[verifyQC] Error while verfying QC message signatures", "Error", err)
772770
sigErrChan <- errors.New("error while verfying QC message signatures")
@@ -777,7 +775,7 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert *
777775
sigErrChan <- errors.New("fail to verify QC due to signature mis-match")
778776
return
779777
}
780-
}(signature)
778+
})
781779
}
782780
wg.Wait()
783781
elapsed := time.Since(start)

consensus/XDPoS/engines/engine_v2/syncInfo.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,11 @@ func (x *XDPoS_v2) processSyncInfoPool(chain consensus.ChainReader) {
128128

129129
func (x *XDPoS_v2) verifySignatures(messageHash common.Hash, signatures []types.Signature, candidates []common.Address) error {
130130
var wg sync.WaitGroup
131-
wg.Add(len(signatures))
132131
var haveError error
133132

134133
for _, signature := range signatures {
135-
go func(sig types.Signature) {
136-
defer wg.Done()
137-
verified, _, err := x.verifyMsgSignature(messageHash, sig, candidates)
134+
wg.Go(func() {
135+
verified, _, err := x.verifyMsgSignature(messageHash, signature, candidates)
138136
if err != nil {
139137
log.Error("[verifySignatures] Error while verfying QC message signatures", "error", err)
140138
haveError = errors.New("error while verfying QC message signatures")
@@ -145,7 +143,7 @@ func (x *XDPoS_v2) verifySignatures(messageHash common.Hash, signatures []types.
145143
haveError = errors.New("fail to verify QC due to signature mismatch")
146144
return
147145
}
148-
}(signature)
146+
})
149147
}
150148
wg.Wait()
151149
if haveError != nil {

consensus/XDPoS/engines/engine_v2/timeout.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,6 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
185185
}
186186

187187
var wg sync.WaitGroup
188-
wg.Add(len(signatures))
189-
190188
var mutex sync.Mutex
191189
var haveError error
192190

@@ -196,11 +194,10 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
196194
})
197195

198196
for _, signature := range signatures {
199-
go func(sig types.Signature) {
200-
defer wg.Done()
201-
verified, _, err := x.verifyMsgSignature(signedTimeoutObj, sig, snap.NextEpochCandidates)
197+
wg.Go(func() {
198+
verified, _, err := x.verifyMsgSignature(signedTimeoutObj, signature, snap.NextEpochCandidates)
202199
if err != nil || !verified {
203-
log.Error("[verifyTC] Error or verification failure", "signature", sig, "error", err)
200+
log.Error("[verifyTC] Error or verification failure", "signature", signature, "error", err)
204201
mutex.Lock() // Lock before accessing haveError
205202
if haveError == nil {
206203
if err != nil {
@@ -213,7 +210,7 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
213210
}
214211
mutex.Unlock() // Unlock after modifying haveError
215212
}
216-
}(signature)
213+
})
217214
}
218215
wg.Wait()
219216
if haveError != nil {

consensus/XDPoS/engines/engine_v2/vote.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,9 @@ func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Has
164164
emptySigner := common.Address{}
165165
// Filter out non-Master nodes signatures
166166
var wg sync.WaitGroup
167-
wg.Add(len(votes))
168-
for h, vote := range votes {
169-
go func(hash common.Hash, v *types.Vote) {
170-
defer wg.Done()
167+
for _, vote := range votes {
168+
wg.Go(func() {
169+
v := vote.(*types.Vote)
171170
signerAddress := v.GetSigner()
172171
if signerAddress != emptySigner {
173172
// verify that signer belongs to the final masternodes, we have not do so in previous steps
@@ -199,7 +198,7 @@ func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Has
199198
return
200199
}
201200
v.SetSigner(masterNode)
202-
}(h, vote.(*types.Vote))
201+
})
203202
}
204203
wg.Wait()
205204
elapsed := time.Since(start)

console/console.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ func New(config Config) (*Console, error) {
119119
return nil, err
120120
}
121121

122-
console.wg.Add(1)
123-
go console.interruptHandler()
122+
console.wg.Go(console.interruptHandler)
124123

125124
return console, nil
126125
}
@@ -341,8 +340,6 @@ func (c *Console) Evaluate(statement string) {
341340
// interruptHandler runs in its own goroutine and waits for signals.
342341
// When a signal is received, it interrupts the JS interpreter.
343342
func (c *Console) interruptHandler() {
344-
defer c.wg.Done()
345-
346343
// During Interactive, liner inhibits the signal while it is prompting for
347344
// input. However, the signal will be received while evaluating JS.
348345
//

core/blockchain.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
327327
}
328328

329329
// Start future block processor.
330-
bc.wg.Add(1)
331-
go bc.futureBlocksLoop()
330+
bc.wg.Go(bc.futureBlocksLoop)
332331

333332
return bc, nil
334333
}
@@ -2572,8 +2571,6 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
25722571

25732572
// futureBlocksLoop processes the 'future block' queue.
25742573
func (bc *BlockChain) futureBlocksLoop() {
2575-
defer bc.wg.Done()
2576-
25772574
futureTimer := time.NewTicker(100 * time.Millisecond)
25782575
defer futureTimer.Stop()
25792576

core/bloombits/scheduler.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []by
6262
pend := make(chan uint64, cap(dist))
6363

6464
// Start the pipeline schedulers to forward between user -> distributor -> user
65-
wg.Add(2)
66-
go s.scheduleRequests(sections, dist, pend, quit, wg)
67-
go s.scheduleDeliveries(pend, done, quit, wg)
65+
wg.Go(func() { s.scheduleRequests(sections, dist, pend, quit) })
66+
wg.Go(func() { s.scheduleDeliveries(pend, done, quit) })
6867
}
6968

7069
// reset cleans up any leftovers from previous runs. This is required before a
@@ -84,9 +83,8 @@ func (s *scheduler) reset() {
8483
// scheduleRequests reads section retrieval requests from the input channel,
8584
// deduplicates the stream and pushes unique retrieval tasks into the distribution
8685
// channel for a database or network layer to honour.
87-
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
86+
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}) {
8887
// Clean up the goroutine and pipeline when done
89-
defer wg.Done()
9088
defer close(pend)
9189

9290
// Keep reading and scheduling section requests
@@ -131,9 +129,8 @@ func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend
131129

132130
// scheduleDeliveries reads section acceptance notifications and waits for them
133131
// to be delivered, pushing them into the output data buffer.
134-
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
132+
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}) {
135133
// Clean up the goroutine and pipeline when done
136-
defer wg.Done()
137134
defer close(done)
138135

139136
// Keep reading notifications and scheduling deliveries

0 commit comments

Comments
 (0)